Kafka Connect File Pulse Not Processing Updates in Existing Files
goyaltu-deshaw opened this issue · 2 comments
goyaltu-deshaw commented
Provide details of the setup you're running
I am running Kafka Connect File Pulse version 2.14.1 on a Linux-based operating system.
Outline your question
I am using the following configuration to deploy the connector. It successfully scans any newly added files in the fs.listing.directory.path
, but it doesn't handle files that are already present and continuously updated. Essentially, new records ingested into these existing files are not being moved to the Kafka topic. I can't find a specific configuration to address this issue.
Thanks!
Configuration
"goyaltu-file-pulse-source-connector-2": {
"connector.name": "filepulse-source-connector",
"transforms.AlignSchemaWithRegistry.schema.registry.urls": "<>",
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"tasks.max": "1",
"tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
"topic": "raft.public.goyaltu.example_app.filepulse2",
"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
"fs.listing.directory.path": "/codemill/goyaltu/example_streaming_webapp/csvfiles",
"fs.listing.interval.ms": "10000",
"file.filter.regex.pattern": ".*\\.csv",
"offset.strategy": "name + size + lastmodified",
"file.input.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
"filters": "ParseCSVLine",
"filters.ParseCSVLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
"filters.ParseCSVLine.auto.generate.column.names": "true",
"filters.ParseCSVLine.trim.column": "true",
"filters.ParseCSVLine.separator": ",",
"tasks.file.status.storage.bootstrap.servers": "<>",
"tasks.file.status.storage.topic": "raft.public.goyaltu.connect-file-pulse-status-2",
"tasks.file.status.storage.producer.security.protocol": "SASL_PLAINTEXT",
"tasks.file.status.storage.producer.sasl.mechanism": "GSSAPI",
"tasks.file.status.storage.producer.request.timeout.ms": "20000",
"tasks.file.status.storage.consumer.security.protocol": "SASL_PLAINTEXT",
"tasks.file.status.storage.consumer.sasl.mechanism": "GSSAPI",
"tasks.file.status.storage.consumer.request.timeout.ms": "20000"
}
goyaltu-deshaw commented