streamthoughts/kafka-connect-file-pulse

Kafka Connect File Pulse Not Processing Updates in Existing Files

goyaltu-deshaw opened this issue · 2 comments

Provide details of the setup you're running

I am running Kafka Connect File Pulse version 2.14.1 on a Linux-based operating system.

Outline your question

I am using the following configuration to deploy the connector. It successfully scans any newly added files in the fs.listing.directory.path, but it doesn't handle files that are already present and continuously updated. Essentially, new records ingested into these existing files are not being moved to the Kafka topic. I can't find a specific configuration to address this issue.

Thanks!

Configuration

            "goyaltu-file-pulse-source-connector-2": {
                "connector.name": "filepulse-source-connector",
                "transforms.AlignSchemaWithRegistry.schema.registry.urls": "<>",
                "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
                "tasks.max": "1",
                "tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
                "topic": "raft.public.goyaltu.example_app.filepulse2",
                "fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
                "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
                "fs.listing.directory.path": "/codemill/goyaltu/example_streaming_webapp/csvfiles",
                "fs.listing.interval.ms": "10000",
                "file.filter.regex.pattern": ".*\\.csv",
                "offset.strategy": "name + size + lastmodified",
                "file.input.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
                "filters": "ParseCSVLine",
                "filters.ParseCSVLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
                "filters.ParseCSVLine.auto.generate.column.names": "true",
                "filters.ParseCSVLine.trim.column": "true",
                "filters.ParseCSVLine.separator": ",",
                "tasks.file.status.storage.bootstrap.servers": "<>",
                "tasks.file.status.storage.topic": "raft.public.goyaltu.connect-file-pulse-status-2",
                "tasks.file.status.storage.producer.security.protocol": "SASL_PLAINTEXT",
                "tasks.file.status.storage.producer.sasl.mechanism": "GSSAPI",
                "tasks.file.status.storage.producer.request.timeout.ms": "20000",
                "tasks.file.status.storage.consumer.security.protocol": "SASL_PLAINTEXT",
                "tasks.file.status.storage.consumer.sasl.mechanism": "GSSAPI",
                "tasks.file.status.storage.consumer.request.timeout.ms": "20000"
            }