Delete support for sink
Psykepro opened this issue · 10 comments
I have a case where I want the Kafka Scylla Sink connector to be able to catch deletions in the source table. Is this possible?
I'm not quite sure what you mean by "source table" (in context of Sink Connector), but Sink Connector upon receiving a Kafka record with null value (tombstone record) performs a DELETE
on a destination Scylla cluster (based on record's key with Scylla primary key).
This functionality is configurable by scylladb.deletes.enabled
parameter (see CONFIG.md, turned on by default).
Yes, I meant from the Kafka topic actually. But is the JDBC Source connector able to catch deletion of a record in the source table and to create a message as a null value in the Kafka topic by default? Or I need to enable it. As we are doing Oracle to Scylla sink
It seems that Confluent's JDBC Source Connector does not support deletions (in incremental
mode it periodically does SELECT * FROM table WHERE incrementingColumn > lastIncrementedValue
(or on timestamp), so I don't see how it could see a deletion).
If you were to check other Source Connector for Oracle, maybe Debezium Connector for Oracle could work?
https://debezium.io/documentation/reference/1.4/connectors/oracle.html
To work well with Sink Connector, you would probably have to enable "New Record State Extraction", as Kafka records generated by Debezium Connector for Oracle have a lot of metadata and this state extraction feature flattens it (and also generates "null" for DELETEs):
https://debezium.io/documentation/reference/1.4/configuration/event-flattening.html
Thank you very much will try this! :)
Trying now with debezium but stuck at error on consuming the messages from the debezium Oracle source connector:
Feb 26, 2021 @ 19:22:15.874 | Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 2:47 no viable alternative at input '' | be-debezium
-- | -- | --
| Feb 26, 2021 @ 19:22:15.874 | at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:52) | be-debezium
| Feb 26, 2021 @ 19:22:15.874 | at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:22) | be-debezium
| Feb 26, 2021 @ 19:22:15.874 | at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:35) | be-debezium
| Feb 26, 2021 @ 19:22:15.874 | at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:293) | be-debezium
| Feb 26, 2021 @ 19:22:15.874 | at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:58) | be-debezium
| Feb 26, 2021 @ 19:22:15.874 | at io.connect.scylladb.ScyllaDbSessionImpl.executeStatement(ScyllaDbSessionImpl.java:56) | be-debezium
| Feb 26, 2021 @ 19:22:15.874 | at io.connect.scylladb.ScyllaDbSchemaBuilder.alter(ScyllaDbSchemaBuilder.java:216) | be-debezium
| Feb 26, 2021 @ 19:22:15.874 | at io.connect.scylladb.ScyllaDbSchemaBuilder.build(ScyllaDbSchemaBuilder.java:243) | be-debezium
| Feb 26, 2021 @ 19:22:15.874 | at io.connect.scylladb.ScyllaDbSessionImpl.createOrAlterTable(ScyllaDbSessionImpl.java:97) | be-debezium
| Feb 26, 2021 @ 19:22:15.874 | at io.connect.scylladb.ScyllaDbSinkTaskHelper.getBoundStatementForRecord(ScyllaDbSinkTaskHelper.java:85) | be-debezium
| Feb 26, 2021 @ 19:22:15.874 | at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:113) | be-debezium
| Feb 26, 2021 @ 19:22:15.874 | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:563) | be-debezium
| Feb 26, 2021 @ 19:22:15.874 | ... 10 more
Using image: debezium/connect:1.5
Debezium Oracle Source Connector Config:
{
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"topic.prefix": "",
"tasks.max": "1",
"database.server.name": "TPFUAT2",
"table.include.list": "xxx.BE__CLE_THIRD_PARTIES",
"numeric.mapping": "best_fit",
"database.url": "jdbc:oracle:thin:@xx.xx.xx.xx:xxxx:TPFUAT2",
"database.user": "xxx",
"database.schema": "xxx",
"database.password": "xxxx",
"database.dbname": "TPFUAT2",
"database.history.kafka.bootstrap.servers": "kafka.internal.dev.xxxx.com:9092",
"database.history.kafka.topic": "schema-changes.BE__CLE_THIRD_PARTIES",
"database.connection.adapter": "logminer",
"transforms": "unwrap,route",
"transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields": "THP_SERIAL",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://be-kafka-schema-registry-svc",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://be-kafka-schema-registry-svc",
"pk.fields": "THP_SERIAL",
"pk.mode": "record_value",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": false,
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3",
"internal.log.mining.dml.parser": "legacy"
}
Confluent ScyllaDB Sink Connector Configuration:
{
"connector.class": "io.connect.scylladb.ScyllaDbSinkConnector",
"topics": "BE__CLE_THIRD_PARTIES",
"tasks.max": "2",
"scylladb.contact.points": "scylla.internal.dev.xxxx.com",
"scylladb.keyspace.create.enabled": "true",
"scylladb.keyspace": "be__cle",
"scylladb.write.mode": "Upsert",
"scylladb.consistency.level": "QUORUM",
"scylladb.keyspace.replication.factor": "3",
"scylladb.table.manage.enabled": "true",
"scylladb.execute.timeout.ms": "60000",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://be-kafka-schema-registry-svc",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://be-kafka-schema-registry-svc",
"transforms": "ReplaceField,unwrap",
"transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.ReplaceField.blacklist": "SYNC_FLAG",
"scylladb.security.enabled": "true",
"scylladb.username": "xxxx",
"scylladb.password": "xxxx",
"pk.fields": "THP_SERIAL",
"pk.mode": "record_value",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": false,
"behavior.on.null.values": "delete"
}
Please help.
@Psykepro could you post an example Kafka message that is sent to a topic that the Sink Connector reads from? (as this is in avro format, you could probably use kafka-avro-console-consumer
to get a human-readable representation)
Can't post the message as it contains sensitive data. But on Monday I'll check all of the messages and send maybe censored data. The thing is that I was able to process all of the records from that table when using normal Kafka JDBC Source connector for Oracle and normal Kafka ScyllaDB Sink Connector without problems.
@avelanarius Can't send a message as it contains PII data :S. Is there any other way to debug it? Maybe to know what exactly to search for such error?
Is it actually the ScyllaDB Sink Connector integrated to work with debezium at first place? As from what I see on debezium documentation they have info just for Cassandra Connector but none for Scylla Connector. Because this is syntax error. And I didn't saw any value with single quotes - ' '.
Never mind we ended up using KSQL Table from the topic so we will skip Scylla DB sink. Thank you anyway.