mmolimar/kafka-connect-fs

Files not being processed

ericbroda opened this issue · 3 comments

You connector is probably one of the better explained connectors available and I really appreciate the work you have put into this. Unfortunately, I am having a problem getting it to process files.

The configurations are all provided below. My understanding is that when I add files to "/data" that I would see their contents in my topic ("sample-topic"), but they are not being processed.

Any ideas are appreciated!

The property file is setup as suggested (I am only using text files, located in "/data"):

name=FsSourceConnector
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=1
fs.uris=file:///data
topic=sample-topic
policy.class=com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy
policy.recursive=true
policy.regexp=*
policy.batch_size=0
policy.cleanup=none
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader
file_reader.batch_size=0

I have build the connector docker image and updated it for confluence platform v6.2.0:

FROM confluentinc/cp-kafka-connect-base:6.2.0

ARG PROJECT_VERSION
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"

COPY ./staging/mmolimar-kafka-connect-fs-${PROJECT_VERSION}.zip /tmp/kafka-connect-fs.zip

RUN confluent-hub install --no-prompt /tmp/kafka-connect-fs.zip

I can see the connector (the first in the list):

curl -sX GET http://localhost:8083/connector-plugins | grep FsSourceConnector

[{"class":"com.github.mmolimar.kafka.connect.fs.FsSourceConnector","type":"source","version":"1.3.0"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"6.2.0-ccs"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"6.2.0-ccs"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]

And I have made minor changes to the docker-compose file (update to 6.2.0, and added a volume to the connector to map to "/data" (which I confirmed is visible to connect):

version: '3'
services:
  cp-zookeeper:
    image: confluentinc/cp-zookeeper:6.2.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  cp-kafka:
    image: confluentinc/cp-kafka:6.2.0
    hostname: kafka
    container_name: kafka
    depends_on:
      - cp-zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'false'

  cp-schema-registry:
    image: confluentinc/cp-schema-registry:6.2.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - cp-zookeeper
      - cp-kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  connect-fs:
    image: kafka-connect-fs:latest
    container_name: connect
    depends_on:
      - cp-kafka
      - cp-schema-registry
    ports:
      - "8083:8083"
      - "8000:8000"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'kafka:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components/"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
      KAFKA_OPTS: "-agentlib:jdwp=transport=dt_socket,server=y,address=8000,suspend=n"
    volumes:
      - ${PROJECT_DIR}/data:/data

Now that things have successfully started, I have created the topic ("sample-topic") as named in the properties file, and then moved files into "/data", but nothing appears in "sample-topic".

Hi @ericbroda!
Thanks for your words.

What are the filenames you have in that directory? Have you tried with another policy such as Sleepy or Cron policy? What can you see in the connector logs?

Thank you for the quick response!

Regarding your questions:

  1. I have only two files in the "/data" directory: "file1.txt" and "file2.txt"
  2. I have not yet tried the other policies? I was hoping the default one would suffice.
  3. There is nothing the connector logs other than Connect has started

Can you try with another policy and also change the policy regexp config to this policy.regexp=^.*\.txt$?