mmolimar/kafka-connect-fs

Get duplicate records in kafka

Opened this issue · 1 comments

Below is my connect-file-source.properties:

name=local-file-source1
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=1
fs.uris=file:///data/test_file
topic=connect-test1
policy.class=com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy
policy.sleepy.sleep=50000
policy.recursive=true
poll.interval.ms=0
policy.regexp=.*
policy.batch_size=0
#policy.cleanup=none
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader
file_reader.batch_size=0

when I run
echo "bbb" >>/data/test_file/1.txt
echo "ddd" >> /data/test_file/1.txt

I got below from kafka:
{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"value"}],"optional":false},"payload":{"value":"bbb"}}
{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"value"}],"optional":false},"payload":{"value":"bbb"}}
{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"value"}],"optional":false},"payload":{"value":"ddd"}}
{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"value"}],"optional":false},"payload":{"value":"ddd"}}

below is log:
nect.fs.policy.SleepyPolicy:285)
[2023-02-20 13:21:01,087] INFO [local-file-source1|task-0] SleepyPolicy Skipping file [file:/data/test_file/=] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:285)
[2023-02-20 13:21:01,088] INFO [local-file-source1|task-0] SleepyPolicy Seeking to offset [2] for file [file:/data/test_file/7.txt]. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:242)
[2023-02-20 13:21:01,110] INFO [local-file-source1|task-0] FsSourceTask Processing records for file [path = file:/data/test_file/7.txt, length = 16, blocks = [[offset = 0, length = 16, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask:93)
[2023-02-20 13:21:01,131] INFO [local-file-source1|task-0] [Producer clientId=connector-producer-local-file-source1-0] Resetting the last seen epoch of partition connect-test1-0 to 0 since the associated topicId changed from null to pOAivNaKReC7FstKHYOn_A (org.apache.kafka.clients.Metadata:402)

[2023-02-20 13:21:11,218] INFO [local-file-source1|task-0] SleepyPolicy Skipping file [file:/data/test_file/1.txt] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:285)
[2023-02-20 13:21:11,218] INFO [local-file-source1|task-0] SleepyPolicy Skipping file [file:/data/test_file/2.txt] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:285)
[2023-02-20 13:21:11,218] INFO [local-file-source1|task-0] SleepyPolicy Skipping file [file:/data/test_file/3.txt] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:285)
[2023-02-20 13:21:11,219] INFO [local-file-source1|task-0] SleepyPolicy Skipping file [file:/data/test_file/6.txt] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:285)
[2023-02-20 13:21:11,219] INFO [local-file-source1|task-0] SleepyPolicy Skipping file [file:/data/test_file/5.txt] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:285)
[2023-02-20 13:21:11,219] INFO [local-file-source1|task-0] SleepyPolicy Skipping file [file:/data/test_file/=] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:285)
[2023-02-20 13:21:11,219] INFO [local-file-source1|task-0] SleepyPolicy Seeking to offset [2] for file [file:/data/test_file/7.txt]. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:242)
[2023-02-20 13:21:11,220] INFO [local-file-source1|task-0] FsSourceTask Processing records for file [path = file:/data/test_file/7.txt, length = 24, blocks = [[offset = 0, length

@mmolimar , Do you have any further updates or plan to fix this issue? After upgrading to the master branch we see duplicate data.