mmolimar/kafka-connect-fs

Avro Serialisation is not working

ibalachandar86 opened this issue · 1 comments

Hi,

I am using FsSourceConnector Kafka connector to ingest CSV files into a Kafka topic.
I am using confluentinc/cp-helm-charts, with custom build docker image for Kafka connect (Added FsSourceConnector connector jar).
I have mentioned the prerequisites, Kafka Connect and Kafka Connector details below.

Problem Statement:
The below Kafka connector is working and I am able to ingest CSV in to Kafka Topic as a string. My goal is to Avro serialise the CSV data and store it in topics. I am not sure which serialisation configuration is missing in my connect/connector properties.

Prerequisites:
I have placed the CSV file in the kafka connect pod directory. Created a schema in confluent schema registry for the csv.

Below is the Kafka connect details,
cp-control-center:
enabled: false

cp-kafka:
enabled: true

cp-kafka-rest:
enabled: false

cp-ksql-server:
enabled: false

cp-schema-registry:
enabled: true

cp-zookeeper:
enabled: true

cp-kafka-connect:
replicaCount: 1

image: localhost:5000/kc
imageTag: v1
imagePullPolicy: Always

servicePort: 8083

configurationOverrides:
“key.converter”: “io.confluent.connect.avro.AvroConverter”
“key.converter.schema.registry.url”: “test-cp-schema-registry:8081”
“value.converter”: “io.confluent.connect.avro.AvroConverter”
“value.converter.schema.registry.url”: “test-cp-schema-registry:8081”
“key.converter.schemas.enable”: “false”
“value.converter.schemas.enable”: “false”
“internal.key.converter”: “org.apache.kafka.connect.json.JsonConverter”
“internal.value.converter”: “org.apache.kafka.connect.json.JsonConverter”
“use.latest.version”: “true”
“auto.register.schemas”: “false”
“auto.create.topics”: “false”
“config.storage.replication.factor”: “1”
“offset.storage.replication.factor”: “1”
“status.storage.replication.factor”: “1”
“plugin.path”: “/usr/share/java,/usr/share/confluent-hub-components,/etc/kafka-connect/jars”

heapOptions: “-Xms5g -Xmx10g”

customEnv:
KAFKA_JMX_HOSTNAME: “127.0.0.1”

kafka:
bootstrapServers: “test-cp-kafka-headless:9092”

cp-schema-registry:
url: “test-cp-schema-registry:8081”

fullnameOverride: test

Below is the Kafka connector details:
curl -X POST \ http://localhost:8083/connectors \
-H 'Content-Type:application/json' \
-d '
{ "name": "sample",
"config": {
"connector.class": "com.github.mmolimar.kafka.connect.fs.FsSourceConnector",
"tasks.max": "1",
"fs.uris": "/home/appuser/csv",
"topic": "sampledata",
"use.latest.version": "true",
"auto.register.schemas": "false",
"poll.interval.ms": "10000",
"auto.create.topics": "false",
"policy.class": "com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy",
"policy.batch_size": "0",
"policy.recursive": "true",
"policy.regexp": "^*.csv$",
"policy.resume.on.error": "false",
"key.converter.schema.registry.url": "http://test-cp-schema-registry:8081",
"key.enhanced.avro.schema.support": "true",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://test-cp-schema-registry:8081", "value.enhanced.avro.schema.support": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"file_reader.delimited.settings.format.quote": """,
"file_reader.delimited.settings.escape_unquoted": "false",
"file_reader.class": "com.github.mmolimar.kafka.connect.fs.file.reader.CsvFileReader", "file_reader.delimited.compression.type": "none",
"file_reader.delimited.settings.schema.avro": "{"type":"record","name":"sampledata","namespace":"default","fields":[{"name":"c1","type":"string"},{"name":"c2","type":"string"},{"name":"c3","type":"string"}]}", "file_reader.delimited.settings.delimiter_detection": "false",
"file_reader.delimited.compression.concatenated": "true",
"file_reader.delimited.settings.format.comment": "#",
"file_reader.delimited.settings.format.quote_escape": """,
"file_reader.delimited.settings.format.delimiter": ",",
"file_reader.encryption.passphrase": "",
"file_reader.delimited.settings.max_chars_per_column": "4096", "file_reader.delimited.settings.line_separator_detection": "false", "file_reader.delimited.settings.format.line_separator": "\n",
"file_reader.delimited.settings.max_columns": "512",
"file_reader.encryption.type": "NONE",
"file_reader.delimited.settings.header": "true",
"file_reader.delimited.settings.ignore_leading_whitespaces": "true",
"file_reader.delimited.settings.rows_to_skip": "0",
"file_reader.batch_size": "0",
"file_reader.encryption.secret": ""
} }'

CSV file:
c1,c2,c3
abc,def,ghi
jkl,mno,pqr
stu,wvy,xyz
x1,x2,x3

Schema in Schema Registry:
{"subject":"sampledata-value","version":1,"id":1,"schema":"{"type":"record","name":"sampledata","namespace":"default","fields":[{"name":"c1","type":"string"},{"name":"c2","type":"string"},{"name":"c3","type":"string"}]}"}

Data in Topic:
/bin/kafka-console-consumer --topic sampledata --from-beginning --bootstrap-server cef-cp-kafka-headless:9092
abcdefghi
jklmnopqr
stuwvyxyz
x1x2x3

Any update on this .. We have the similar issue facing with the parquet as source