messages are not inserting in scylladb
dhgokul opened this issue · 2 comments
Connected snowplow thrift function publisher and kafka to post google analytics data as message to a topic.
When a webpage is refreshed the ip and browser details will be collected from google analytics by snowplow and using thrift function the ip and browser are posted to kafka avro topic.
The reference for above work:
Now the messages which posted on topic from thrift function have to be dumb in scylla db, so used below config:
{
"name" : "scylladb-sink-connector",
"config" : {
"connector.class" : "io.connect.scylladb.ScyllaDbSinkConnector",
"tasks.max" : "1",
"topics" : "gaparsed3",
"scylladb.contact.points" : "localhost",
"scylladb.port":"9042",
"scylladb.keyspace" : "mqtest",
"key.converter" : "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter" : "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"key.converter.schemas.enable" : "true",
"value.converter.schemas.enable" : "true",
"transforms" : "createKey",
"transforms.createKey.fields" : "timestamp",
"transforms.createKey.type" : "org.apache.kafka.connect.transforms.ValueToKey"
}
}
in scylla , keyspace mentioned in config and topic name as table has been created:
CREATE KEYSPACE mqtest WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = true;
create table gaparsed3(ip varchar,browser varchar,timestamp varchar,primary key(timestamp));
Then JSON config is successfully loaded.
using kafka avro console producer :
./bin/kafka-avro-console-producer --bootstrap-server localhost:9092 --topic dbtest --property value.schema=' {"type":"record","name":"gaGeneric","fields":[{"name":"ip","type":["string","null"]},{"name":"browser","type":["string","null"]},{"name":"timestamp","type":["string","null"]}]} '
and message as :
{"ip":{"string":"xxx.xxx.xxx.xx"},"browser":{"string":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.162 Safari/537.36"},"timestamp":{"string":"2021-03-19 10:55:16.758"}}
the values are inserted in table:
But when using thrift java function the messages are not inserting in DB, but messages are coming in kafka-avro-console-consumer ,
root@kafka:~/confluent-6.1.0# ./bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic gaparsed3
{"ip":{"string":"xxx.xx.xxx.xxx"},"browser":{"string":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.162 Safari/537.36"},"timestamp":{"string":"2021-03-19 12:48:43.712"}}
The following error is raised in scylla connection when checked in log:
confluent local services connect log
The error log is:
[2021-03-19 12:48:44,133] ERROR WorkerSinkTask{id=scylladb-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:187)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:498)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic gaparsed3 to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:125)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:535)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:498)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
How to overcome this issue? Any changes in JSON config file?
From the stack trace it seems that the error is on the Confluent's converter side (io.confluent.connect.avro.AvroConverter
), not necessarily with Sink Connector itself.
The org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
error means that some message on gaparsed3
is not starting with 0
byte - not conforming to Confluent's Wire Format.
One thing you could check is if messages on this topic are in this format (0
byte, schema id in Confluent's Schema Registry and Avro data). You should also check in Confluent's Control Center if there is a schema defined on that topic (in Schema tab) - that should be created automatically when sending messages to this topic in proper Avro format (with proper converters on the producer side).
When running ./bin/kafka-avro-console-consumer
it reads from current time (instead of the beginning of topic), while the Sink Connector reads from the beginning. It could be that there is some corrupted message early in that topic, so Sink Connector fails, while kafka-avro-console-consumer
never sees that message (and doesn't fail). By using --from-beginning
flag you could check if all messages are parsed correctly (if not, delete and recreate the topic to make it empty).
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
In my case, for the schema
{"type":"record","name":"gaGeneric","fields":[{"name":"ip","type":["string","null"]},{"name":"browser","type":["string","null"]},{"name":"timestamp","type":["string","null"]}]}
when tried with console the key is returned with null value as :
console-consumer:
null {"ip":{"string":"000.00.000.000"},"browser":{"string":"APPLE/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.162 Safari/537.36"},"timestamp":{"string":"2021-03-19 10:55:16.800"}}
when message is produced from thrift function the message returned with key value,
[B@28952dea {"ip":{"string":"000.00.000.000"},"browser":{"string":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.162 Safari/537.36"},"timestamp":{"string":"2021-03-22 10:58:22.114"}}
In this type I got the issue of
org.apache.kafka.common.errors.SerializationException: Unknown magic b
So I changed the config of
`"key.converter" : "io.confluent.connect.avro.AvroConverter",
`to
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
Now its dumbing for both console produced message and from the external function -Thrift