streamthoughts/kafka-connect-file-pulse

File pulse GCS to kafka

izzie88 opened this issue · 6 comments

Hello Engineers,

I am trying to stream a csv file from GCS to kafka but the connector keeps faialing upon setup.
Please see my config file below

name-gcs-source
connector.class=io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector
fs.listing.filters=io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter
fs.listing.class=io.streamthoughts.kafka.connect.filepulse.fs.GcsFileSystemListing
tasks.max=1
filters.ParseDelimitedRow.extractColumnName=headers
tasks.reader.class=io.streamthoughts.kafka.connect.filepulse.fs.reader.GcsRowFileInputReader
filters=ParseDelimitedRow
tasks.file.status.storage.bootstrap.servers=kafka1:19091,kafka2:19092,kafka3:19093,kafka4:19094,kafka5:19095,kafka6:19096,kafka7:19097,kafka8:19098,kafka9:19099,kafka11:17091,kafka12:17092,kafka13:17093,kafka14:17094,kafka15:17095,kafka16:17096,kafka17:17097,kafka18:17098,kafka19:17099
gcs.bucket.name=transactions_kafka_source
fs.cleanup.policy.class=io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy
filters.ParseDelimitedRow.type=io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter
gcs.credentials.path=/etc/kafka-connect/jars/gcs-ro.json
filters.ParseDelimitedRow.separator=;
topic=new-journal_entries-v5
filters.ParseDelimitedRow.trimColumn=true
fs.listing.interval.ms=10000
file.filter.regex.pattern=.*.csv$

Are you getting a ClassNotFoundException?

this is the error:

org.apache.kafka.common.config.ConfigException: Invalid value io.streamthoughts.kafka.connect.filepulse.fs.GcsFileSystemListing for configuration fs.listing.class: Class io.streamthoughts.kafka.connect.filepulse.fs.GcsFileSystemListing could not be found.
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:729)
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:475)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:468)
at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108)
at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:142)
at io.streamthoughts.kafka.connect.filepulse.config.CommonSourceConfig.(CommonSourceConfig.java:107)
at io.streamthoughts.kafka.connect.filepulse.config.SourceTaskConfig.(SourceTaskConfig.java:96)
at io.streamthoughts.kafka.connect.filepulse.config.SourceTaskConfig.(SourceTaskConfig.java:86)
at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.start(FilePulseSourceTask.java:127)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

this is my config

connector.class=io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector
fs.listing.filters=io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter
fs.listing.class=io.streamthoughts.kafka.connect.filepulse.fs.GcsFileSystemListing
tasks.max=1
tasks.reader.class=io.streamthoughts.kafka.connect.filepulse.fs.reader.GcsRowFileInputReader
tasks.file.status.storage.bootstrap.servers=kafka1:19091,kafka2:19092,kafka3:19093,kafka4:19094,kafka5:19095,kafka6:19096,kafka7:19097,kafka8:19098,kafka9:19099,kafka11:17091,kafka12:17092,kafka13:17093,kafka14:17094,kafka15:17095,kafka16:17096,kafka17:17097,kafka18:17098,kafka19:17099
gcs.bucket.name=kuwego_cba_journal_transactions_kafka_source
fs.cleanup.policy.class=io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy
gcs.credentials.path=/etc/kafka-connect/jars/gcs-ro.json
topic=new-journal_entries-v5
fs.listing.interval.ms=10000
file.filter.regex.pattern=.*.csv$

I dug a little bit deeper, I know why that error is happening. @izzie88

If you check the included classes in the confluent hub compiled zip, you'll see GCS and S3 (and others) are not included in the 2.10 release - this also shows in the Github release, I'm not sure why. The latest versions do have them, and 2.9 does too, so I'm not sure.

What you can try to do is install by using the zip from the latest release

Apart from what was previously described, there seems to be an issue with dependencies in the project, as presented by the logs of a deployment I'm currently doing:

java.lang.NoSuchMethodError: 'com.google.common.collect.ImmutableMap com.google.common.collect.ImmutableMap$Builder.buildOrThrow()'
at com.google.cloud.storage.UnifiedOpts$Opts.getRpcOptions(UnifiedOpts.java:2157)
...

Upon inspection, the dependencies on google-cloud-storage in pom.xml seem to be locked with mave BOM [EDITED], but maybe there's a problem with versions? Not sure...

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

This issue was closed because it has been stalled for 30 days with no activity.