scylladb/kafka-connect-scylladb

Connector fails to start when TTL is set using "scylladb.ttl"

pushpavanthar opened this issue · 0 comments

The Connector fails to start when "scylladb.ttl" config is set. There seems to be casting exception due to datatype of ttl in io.connect.scylladb.ScyllaDbSinkConnectorConfig.class.

The below exception is observed when a connector is POSTed.

2020-07-24 06:28:29,182] ERROR WorkerConnector{id=scylladb.sink.dp.ksql_t.poc_group_agg_cluster} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector)
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
	at org.apache.kafka.common.config.AbstractConfig.getInt(AbstractConfig.java:173)
	at io.connect.scylladb.ScyllaDbSinkConnectorConfig.<init>(ScyllaDbSinkConnectorConfig.java:93)
	at io.connect.scylladb.ScyllaDbSinkConnector.start(ScyllaDbSinkConnector.java:45)
	at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
	at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
	at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
	at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:252)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1079)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:117)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1095)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1091)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Connector config I used for reference :

{
    "connector.class": "io.connect.scylladb.ScyllaDbSinkConnector",
    "scylladb.port": "9042",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "tasks.max": "1",
    "topics": "poc_group_agg_test",
    "scylladb.contact.points": "scylla.hostname.com",
    "transforms": "createKey",
    "behavior.on.error": "FAIL",
    "scylladb.password": "*********",
    "key.converter.schemas.enable": "false",
    "scylladb.username": "username",
    "scylladb.keyspace": "test",
    "transforms.createKey.fields": "GROUP_ID,WINDOW_START,WINDOW_END",
    "scylladb.security.enabled": "true",
    "scylladb.offset.storage.table.enable": "true",
    "scylladb.ttl": "600",
    "value.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
  }