streamthoughts/kafka-connect-file-pulse

CSV Line to proper JSON

srikanthvpai opened this issue · 6 comments

Hi Team,

Thank you @fhussonnois for this connector. Ver 2.9.0 is working fine on our cloud setup. But had a question regarding the connector . Is it possible to get proper json out of csv file consumption ?

Currently once the csv file is consumed , messages follow a format like this. The entire header is present as one field.

{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"RECORD_TYPE,RECORD_ID"}],"optional":false},"payload":{"RECORD_TYPE,RECORD_ID":"TK,-21099418"}}

Is it possible to get the above message to something like

{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"RECORD_TYPE"}{"type":"string","optional":true,"field":"RECORD_TYPE"}],"optional":false},"payload":{"RECORD_TYPE":"TK","RECORD_ID":-21099418}}

Connector properties I put in :

{
"name": "LocalCSVFilepulse3-Conn",
"config": {
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"filters": "ParseCSVLine",
"filters.ParseCSVLine.extract.column.name": "headers",
"filters.ParseCSVLine.trim.column": "true",
"filters.ParseCSVLine.seperator": ",",
"filters.ParseCSVLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
"fs.cleanup.policy.triggered.on":"COMMITTED",
"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
"fs.listing.directory.path":"/mnt/d/ATK/kk-input",
"fs.listing.filters":"io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
"fs.listing.interval.ms": "10000",
"file.filter.regex.pattern":".*\.csv$",
"offset.policy.class":"io.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicy",
"offset.attributes.string": "name",
"skip.headers": "1",
"topic": "first_topic",
"tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
"tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore",
"tasks.file.status.storage.bootstrap.servers": "<>:9092",
"tasks.file.status.storage.topic": "connect-file-pulse-status",
"tasks.file.status.storage.topic.partitions": 10,
"tasks.file.status.storage.topic.replication.factor": 1,
"tasks.max": 1,
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.DeleteCleanupPolicy"
}
}

Hi @srikanthvpai, I don't know why you got this result (your configuration seems valid). Could you please try to to use the CSVFilter instead of the DelimitedRowFilter ?

Example:

filters=ParseCSVLine
filters.ParseCSVLine.type="io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter"
filters.ParseCSVLine.extract.column.name="headers"
filters.ParseCSVLine.separator=","
filters.ParseCSVLine.trim.column="true"

@fhussonnois Thanks a lot. Yes it worked. The filter needed to be changed. Do you think this example needs to be updated ?

https://github.com/streamthoughts/kafka-connect-file-pulse/blob/master/examples/connect-file-pulse-quickstart-csv.json

Regards,

I have used DelimitedRowFilter and JSON came out ok.

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

This issue was closed because it has been stalled for 30 days with no activity.

Where can I find the docs for ParseCSVLine filter?