mmolimar/kafka-connect-fs

Error reading file from FTP

rajeshpyne opened this issue · 4 comments

PFB configuration file:-
name=ftp-source-connector
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=1
fs.uris=sftp://<user_name>:<pass_word>@ip:<remote_directory>
topic=ftp_test
policy.fs.fs.sftp.impl=org.apache.hadoop.fs.sftp.SFTPFileSystem
policy.class=com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy
policy.sleepy.sleep=100000
policy.recursive=false
policy.regexp=..csv$
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.DelimitedTextFileReader
file_reader.delimited.token=,
file_reader.delimited.header=true

PFB the error :-
ERROR Error reading file from FS: <sftp_location>. Keep going... (com.github.mmolimar.kafka.connect.fs.FsSourceTask:79)
org.apache.kafka.connect.errors.ConnectException: An error has occurred when creating reader for file: sftp://username:password@ip:directory_path
at com.github.mmolimar.kafka.connect.fs.policy.AbstractPolicy.offer(AbstractPolicy.java:208)
at com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy.offer(SleepyPolicy.java:11)
at com.github.mmolimar.kafka.connect.fs.FsSourceTask.lambda$poll$0(FsSourceTask.java:72)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at com.github.mmolimar.kafka.connect.fs.FsSourceTask.poll(FsSourceTask.java:71)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:245)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils.lambda$make$0(ReflectionUtils.java:28)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
at com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils.make(ReflectionUtils.java:28)
at com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils.makeReader(ReflectionUtils.java:19)
at com.github.mmolimar.kafka.connect.fs.policy.AbstractPolicy.offer(AbstractPolicy.java:205)
... 13 more

@mmolimar , Any update on it? Used this connector for streaming files from S3 to Kafka already. But waiting for this error to be solved from FTP to Kafka.
Any leads on this would really be helpful.

@mmolimar , Any update on it? Used this connector for streaming files from S3 to Kafka already.

@rajeshpyne What fs.uri did you use for S3? The doc is quite limited. Is it s3://bucket-name?

@adiep-cdpq ,
It is
"fs.uris": "s3://"+bucket-name+"/"+file_path
for ingesting data from S3.

But the problem here is ingesting files through FTP.
I cannot understand why @mmolimar has mentioned that this connector can ingest data through FTP, when he does not reply back when someone faces any issue while trying to use this connector for FTP!

This connector uses the FileSystem abstraction from Hadoop. So if Hadoop contains the implementation from "any" sort of filesystem and this works, the connector should work (except some special use cases maybe).
In case there is a bug in the source code, submit a PR with the bugfix!