Not quoted column names in `CREATE TABLE` query
avelanarius opened this issue · 0 comments
avelanarius commented
Scylla Sink Connector provides the functionality to automatically create Scylla table based on Kafka message schema.
However, the generated CREATE TABLE
query does not correctly quote column names (for example if you have a column name one two three
with spaces you must write it in quotes "one two three"
- quoteIfNecessary
method in Java Driver).
Reproducer:
- Scylla (cqlsh):
CREATE KEYSPACE ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};
- In Kafka create a
mytable
topic. - Start the connector with the following configuration:
{ "key.converter.schemas.enable": "true", "value.converter.schemas.enable": "true", "name": "mytableConnector", "connector.class": "io.connect.scylladb.ScyllaDbSinkConnector", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "topics": [ "mytable" ], "scylladb.contact.points": "127.0.0.2", "scylladb.port": "9042", "scylladb.keyspace": "ks" }
- Write the message to Kafka using console producer (columns that show the issue:
primary key
andck!@#$%^&*()
):$ bin/kafka-console-producer --topic mytable --bootstrap-server localhost:9092 --property parse.key=true --property "key.separator=?" {"schema": {"type": "struct", "fields": [{"type": "int64", "optional": false, "field": "primary key"}, {"type": "int64", "optional": false, "field": "ck!@#$%^&*()"} ], "optional": false, "name": "mytablePK"}, "payload": {"primary key": 5, "ck!@#$%^&*()": 10}}?{"schema": {"type": "struct", "fields": [{"type": "int64", "optional": false, "field": "primary key"}, {"type": "int64", "optional": false, "field": "ck!@#$%^&*()"}, {"type": "int64", "optional": false, "field": "v"} ], "optional": false, "name": "mytableVAL"}, "payload": {"primary key": 5, "ck!@#$%^&*()": 10, "v": 15}}
- The connector fails. In the logs (
/tmp/confluent.XXXXXX/connect/logs/connect.log
in case of Confluent) you can see:The correct query should be:CREATE TABLE ks.mytable( primary key bigint, ck!@#$%^&*() bigint, v bigint, PRIMARY KEY((primary key, ck!@#$%^&*())))
CREATE TABLE ks.mytable ( "primary key" bigint, "ck!@#$%^&*()" bigint, v bigint, PRIMARY KEY("primary key", "ck!@#$%^&*()"));