streamthoughts/kafka-connect-file-pulse

Getting timeout exception for s3 source connector, has to do with tasks.file.status.storage.bootstrap.servers..

Opened this issue · 1 comments

Setup Details
I'm running FilePulse 2.13.0 (https://github.com/streamthoughts/kafka-connect-file-pulse/releases/download/v2.13.0/streamthoughts-kafka-connect-file-pulse-2.13.0.zip) on Mac

Concern
I'm trying to create a s3 source connector using filepulse and this connector is linked to a topic in IBM Cloud's Event Streams. I'm not entirely sure what to set tasks.file.status.storage.bootstrap.servers to. When I set it to localhost:9092, I get a timeout exception. The same occrs when I set the field to the cluster's internal listener: development-kafka-bootstrap.cp4i.svc:9093. If I comment out the field, I get a Failed to create shared StateBackingStore for group 'connect-file-pulse-amazon-s3-csv' error. What should I set it to? Is there any other configuration I'm missing?

Here's my connector's yaml:

apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaConnector
metadata:
  name: connect-file-pulse-amazon-s3-csv
  labels:
    eventstreams.ibm.com/cluster: my-connect-cluster
spec:
  class: io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector
  tasksMax: 1
  config:
    topic: XXXX
    tasks.max: 1
    fs.listing.class: io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3FileSystemListing
    fs.listing.interval.ms: 10000
    # fs.listing.filters: io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.IgnoreHiddenFileListFilter
    file.filter.regex.pattern: .*\\.csv$

    fs.cleanup.policy.class: io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy 

    aws.access.key.id: XXXX
    aws.secret.access.key: XXXX
    aws.s3.region: us-east-2
    aws.s3.bucket.name: XXXX

    tasks.reader.class: io.streamthoughts.kafka.connect.filepulse.fs.reader.AmazonS3RowFileInputReader

    skip.headers: 1
    offset.attributes.string: uri

    filters: ParseLine
    filters.ParseLine.type: io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter
    filters.ParseLine.extractColumnName: headers
    filters.ParseLine.trimColumn: true
    filters.ParseLine.separator: ;
    tasks.file.status.storage.bootstrap.servers: localhost:9092
    tasks.file.status.storage.topic: connect-file-pulse-status
    tasks.file.status.storage.topic.partitions: 10
    # tasks.file.status.storage.topic.replication.factor: 1

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.