splunk/kafka-connect-splunk

Send logs from Kafka to a Splunk HEC

rushins opened this issue · 17 comments

hi There,

I have configured the kafka-connect-splunk and my connector-plugins output shows the driver class "com.splunk.kafka.connect.SplunkSinkConnector" , then i started HEC configuration with this json with the following and it seems no data going to splunk as splunk index is empty when a search run?

any clue what is going wrong.


curl localhost:8083/connectors -X POST -H "Content-Type: application/json" -d '{
"name": "kafka-connect-splunk",
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "3",
"topics":"HSL-RTLogEventIn",
"splunk.indexes": "etd",
"splunk.hec.uri": "http://kakfa.dum.corp:8000",
"splunk.hec.token": "93906c53-1291-4695-80cd-13c3859eae7c",
"splunk.hec.raw": "true",
"splunk.hec.ack.enabled":"false"
}
}'


is your index associated with the token you are using?
Can you post errors that you are seeing?

i dont think so there is on way in INDEX properties to have the TOKEN ? TOKEN info is only passing through JSON as provided above. splunk index is simple index just like all other indexes but data type is HEC ..

i follow this guide and enabled all TOKENs for GLobal setting
https://docs.splunk.com/Documentation/Splunk/8.0.5/Data/UsetheHTTPEventCollector

Can you post errors that you are seeing? I cant help you out without any logs. Also follow this https://docs.splunk.com/Documentation/KafkaConnect/1.2.0/User/Troubleshootyourdeployment for enabling debug logging.

which log files you need to review...i checked server.log, controller.log, kafkaServer-gc.log.0.current, server.log.2020-08-13-12, zookeeper-gc.log.0.current . none of these logs shows any errors.

the log file associated with connect-distributed.sh or connect-standalone.sh

i run this one "bin/connect-distributed.sh config/connect-distributed.properties" and i checked "connect-distributed.sh"

i see no log or trace inside this file referring but connect-distributed.properties is not referring any log file

if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] connect-distributed.properties"
exit 1
fi

base_dir=$(dirname $0)

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/connect-log4j.properties"
fi

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xms256M -Xmx2G"
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name connectDistributed'}

COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac

exec $(dirname $0)/kafka-run-class.sh $EXTRA_ARGS org.apache.kafka.connect.cli.ConnectDistributed "$@"


connect-log4j.properties

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.I0Itec.zkclient=ERROR
log4j.logger.org.reflections=ERROR

any clue which log fiile to check ..

any idea ?

When you run connect-distributed.sh logs are printed to stdout, those logs can help troubleshoot errors if any. Also, if you have support entitlement with Splunk please reach out to Splunk support and open a case.

i have opened support ticket iwth splunk they bounce back asking to check the driver issues ?

here is the output from connect-distributed-.sh and few errors that came at the end. let me know if you see anything you can help to solve it .


error

[2020-08-19 20:38:40,019] ERROR WorkerSinkTask{id=kafka-connect-splunk-1} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handle

[2020-08-19 20:38:40,020] ERROR WorkerSinkTask{id=kafka-connect-splunk-1} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)


full ouptut

[2020-08-19 20:38:37,020] ERROR WorkerSinkTask{id=kafka-connect-splunk-2} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:344)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'INDICATOR_PATTERNRTParser': was expecting 'null', 'true', 'false' or NaN
at [Source: (byte[])"INDICATOR_PATTERNRTParser HeartbeatIndicator10.48.185.70IP Addresslvhanaetd.pal.sap.corp10.48.185.70NumberNumberJava Heap Size in MB (Used)Java Heap Size in MB (Max)62436410Sum of queue items2020-08-20T03:38:37.717lvhanaetd.pal.sap.corp10.48.185.70"; line: 1, column: 35]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'INDICATOR_PATTERNRTParser': was expecting 'null', 'true', 'false' or NaN
at [Source: (byte[])"INDICATOR_PATTERNRTParser HeartbeatIndicator10.48.185.70IP Addresslvhanaetd.pal.sap.corp10.48.185.70NumberNumberJava Heap Size in MB (Used)Java Heap Size in MB (Max)62436410Sum of queue items2020-08-20T03:38:37.717lvhanaetd.pal.sap.corp10.48.185.70"; line: 1, column: 35]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:703)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3532)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3508)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchToken(UTF8StreamJsonParser.java:2826)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2611)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:832)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:729)
at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4042)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2571)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:342)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-08-19 20:38:37,021] ERROR WorkerSinkTask{id=kafka-connect-splunk-2} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
[2020-08-19 20:38:37,022] INFO kafka-connect-splunk task ends with config=splunkURI:http://lvpalakjfrog1.pal.sap.corp:8000, raw:true, ack:false, indexes:etd, sourcetypes:, sources:, headerSupport:false, headerCustom:, httpKeepAlive:true, validateCertificates:true, trustStorePath:, socketTimeout:60, eventBatchTimeout:300, ackPollInterval:10, ackPollThreads:2, maxHttpConnectionPerChannel:2, totalHecChannels:2, enrichment:, maxBatchSize:500, numberOfThreads:1, lineBreaker:, maxOutstandingEvents:1000000, maxRetries:-1, useRecordTimestamp:true, hecEventFormatted:false, trackData:false, headerSupport:false, headerCustom:, headerIndex:splunk.header.index, headerSource:splunk.header.source, headerSourcetype:splunk.header.sourcetype, headerHost:splunk.header.host (com.splunk.kafka.connect.SplunkSinkTask:335)
[2020-08-19 20:38:37,022] INFO [Consumer clientId=consumer-5, groupId=connect-kafka-connect-splunk] Member consumer-5-69680f0f-880b-42f4-8497-d9792f2f761b sending LeaveGroup request to coordinator lvhanakafka.pal.sap.corp:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:822)
[2020-08-19 20:38:39,691] INFO [Consumer clientId=consumer-6, groupId=connect-kafka-connect-splunk] Attempt to heartbeat failed since group is rebalancing (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:871)
[2020-08-19 20:38:39,691] INFO [Consumer clientId=consumer-6, groupId=connect-kafka-connect-splunk] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:459)
[2020-08-19 20:38:39,692] INFO [Consumer clientId=consumer-6, groupId=connect-kafka-connect-splunk] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2020-08-19 20:38:39,696] INFO [Consumer clientId=consumer-6, groupId=connect-kafka-connect-splunk] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:455)
[2020-08-19 20:38:39,696] INFO [Consumer clientId=consumer-6, groupId=connect-kafka-connect-splunk] Setting newly assigned partitions: HSL-RTLogEventIn-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:290)
[2020-08-19 20:38:39,700] INFO [Consumer clientId=consumer-6, groupId=connect-kafka-connect-splunk] Resetting offset for partition HSL-RTLogEventIn-0 to offset 2341537. (org.apache.kafka.clients.consumer.internals.Fetcher:584)
[2020-08-19 20:38:40,019] ERROR WorkerSinkTask{id=kafka-connect-splunk-1} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:344)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'INDICATOR_PATTERNRTParser': was expecting 'null', 'true', 'false' or NaN
at [Source: (byte[])"INDICATOR_PATTERNRTParser HeartbeatIndicator10.48.185.70IP Addresslvhanaetd.pal.sap.corp10.48.185.70NumberNumberJava Heap Size in MB (Used)Java Heap Size in MB (Max)62436410Sum of queue items2020-08-20T03:38:40.717lvhanaetd.pal.sap.corp10.48.185.70"; line: 1, column: 35]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'INDICATOR_PATTERNRTParser': was expecting 'null', 'true', 'false' or NaN
at [Source: (byte[])"INDICATOR_PATTERNRTParser HeartbeatIndicator10.48.185.70IP Addresslvhanaetd.pal.sap.corp10.48.185.70NumberNumberJava Heap Size in MB (Used)Java Heap Size in MB (Max)62436410Sum of queue items2020-08-20T03:38:40.717lvhanaetd.pal.sap.corp10.48.185.70"; line: 1, column: 35]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:703)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3532)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3508)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchToken(UTF8StreamJsonParser.java:2826)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2611)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:832)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:729)
at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4042)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2571)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:342)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-08-19 20:38:40,020] ERROR WorkerSinkTask{id=kafka-connect-splunk-1} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
[2020-08-19 20:38:40,020] INFO kafka-connect-splunk task ends with config=splunkURI:http://lvpalakjfrog1.pal.sap.corp:8000, raw:true, ack:false, indexes:etd, sourcetypes:, sources:, headerSupport:false, headerCustom:, httpKeepAlive:true, validateCertificates:true, trustStorePath:, socketTimeout:60, eventBatchTimeout:300, ackPollInterval:10, ackPollThreads:2, maxHttpConnectionPerChannel:2, totalHecChannels:2, enrichment:, maxBatchSize:500, numberOfThreads:1, lineBreaker:, maxOutstandingEvents:1000000, maxRetries:-1, useRecordTimestamp:true, hecEventFormatted:false, trackData:false, headerSupport:false, headerCustom:, headerIndex:splunk.header.index, headerSource:splunk.header.source, headerSourcetype:splunk.header.sourcetype, headerHost:splunk.header.host (com.splunk.kafka.connect.SplunkSinkTask:335)
[2020-08-19 20:38:40,020] INFO [Consumer clientId=consumer-6, groupId=connect-kafka-connect-splunk] Member consumer-6-de5d3d9e-bb61-4d12-af1f-9c311091a699 sending LeaveGroup request to coordinator lvhanakafka.pal.sap.corp:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:822)


even though it show up error, but the connector plugin output listed on "SplunkSinkConnector" ?
http://lvkafka:8083/connector-plugins


0  
class "com.splunk.kafka.connect.SplunkSinkConnector"
type "sink"
version "v1.0.0"
1  
class "org.apache.kafka.connect.file.FileStreamSinkConnector"
type "sink"
version "2.2.0"
2  
class "org.apache.kafka.connect.file.FileStreamSourceConnector"
type "source"
version "2.2.0"

hey @rushins looks like you are facing deserialization issues. Look at https://docs.splunk.com/Documentation/KafkaConnect/1.2.0/User/InstallSplunkKafkaConnect looks like you might have missed out on point 5 - specifically

#Required configurations for Splunk Connect for Kafka
bootstrap.servers=<BOOTSTRAP_SERVER1,BOOTSTRAP_SERVER2,BOOTSTRAP_SERVER3 >
plugin.path=<PLUGIN_PATH>
key.converter=<org.apache.kafka.connect.storage.StringConverter|org.apache.kafka.connect.json.JsonConverter|io.confluent.connect.avro.AvroConverter>
value.converter=<org.apache.kafka.connect.storage.StringConverter|org.apache.kafka.connect.json.JsonConverter|io.confluent.connect.avro.AvroConverter>

Thanks, I will keep assisting you here. I asked you to contact support so that they could help you more proactively.

i have all of these paraeters as per the guide but i made some parameters to "false " due to several errors.
here is my paraemeters set before execution


bootstrap.servers=lvkafka.domain:9092
plugin.path=/kafka/plugins/
#key.converter=org.apache.kafka.connect.storage.StringConverter|org.apache.kafka.connect.json.JsonConverter|io.confluent.connect.avro.AvroConverter
#value.converter=org.apache.kafka.connect.storage.StringConverter|org.apache.kafka.connect.json.JsonConverter|io.confluent.connect.avro.AvroConverter
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.flush.interval.ms=10000
consumer.auto.offset.reset=latest

is your data in JSON? you might wanna try org.apache.kafka.connect.json.JsonConverter

no JSON

@rushins from the error
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:344)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more

its a json de-serialization error

Closing due to inactivity, if the issue still persists please open a new one. Thanks.