/kafka-connect-spooldir

Kafka Connect connector for reading CSV files into Kafka.

Primary LanguageJava

Overview

This Kafka Connect connector provides the capability to watch a directory for files and read the data as new files are written to the input directory. The RecordProcessor implementation can be overridden so any file type can be supported. Currently there is support for delimited files and reading a file line by line.

The CSVRecordProcessor supports reading CSV or TSV files. It can convert a CSV on the fly to the strongly typed Kafka Connect data types. It currently has support for all of the schema types and logical types that are supported in Kafka 0.10.x. If you couple this with the Avro converter and Schema Registry by Confluent, you will be able to process csv files to strongly typed Avro data in real time.

The LineRecordProcessor supports reading a file line by line and emitting the line.

Building on you workstation

    git@github.com:jcustenborder/kafka-connect-spooldir.git
    cd kafka-connect-spooldir
    mvn clean package

Running on your workstation

Schema Configuration

This connector allows you to either infer a schema with nullable strings from the header row, or you can specify the schema in json format. To use the automatic schema generation set csv.first.row.as.header=true, csv.schema.from.header=true, csv.schema.from.header.keys=key1,key2. To manually define the schema set csv.schema to a json representation of the schema. The example below works is for the mock data in the test class.

Configuration options

Name Description Type Default Valid Values Importance
error.path The directory to place files in which have error(s). This directory must exist and be writable by the user running Kafka Connect. string high
finished.path The directory to place files that have been successfully processed. This directory must exist and be writable by the user running Kafka Connect. string high
input.file.pattern Regular expression to check input file names against. This expression must match the entire filename. The equivalent of Matcher.matches(). string high
input.path The directory to read files that will be processed. This directory must exist and be writable by the user running Kafka Connect. string high
record.processor.class Class that implements RecordProcessor. This class is used to process data as it arrives. class high
topic The Kafka topic to write the data to. string high
halt.on.error Should the task halt when it encounters an error or continue to the next file. boolean true high
csv.first.row.as.header Flag to indicate if the fist row of data contains the header of the file. boolean false medium
csv.schema Schema representation in json. string "" medium
batch.size The number of records that should be returned with each batch. int 1000 low
csv.case.sensitive.field.names Flag to determine if the field names in the header row should be treated as case sensitive. boolean false low
csv.escape.char Escape character. int 92 low
csv.file.charset Character set to read wth file with. string UTF-8 low
csv.ignore.leading.whitespace Sets the ignore leading whitespace setting - if true, white space in front of a quote in a field is ignored. boolean true low
csv.ignore.quotations Sets the ignore quotations mode - if true, quotations are ignored. boolean false low
csv.keep.carriage.return Flag to determine if the carriage return at the end of the line should be maintained. boolean false low
csv.null.field.indicator Indicator to determine how the CSV Reader can determine if a field is null. Valid values are EMPTY_SEPARATORS, EMPTY_QUOTES, BOTH, NEITHER. For more information see http://opencsv.sourceforge.net/apidocs/com/opencsv/enums/CSVReaderNullFieldIndicator.html. string NEITHER ValidEnum{enumClass=CSVReaderNullFieldIndicator, validEnums=[NEITHER, EMPTY_SEPARATORS, EMPTY_QUOTES, BOTH]} low
csv.parser.timestamp.date.formats The date formats that are expected in the file. This is a list of strings that will be used to parse the date fields in order. The most accurate date format should be the first in the list. Take a look at the Java documentation for more info. https://docs.oracle.com/javase/6/docs/api/java/text/SimpleDateFormat.html list [yyyy-MM-dd' 'HH:mm:ss] low
csv.parser.timestamp.timezone The timezone that all of the dates will be parsed with. string UTC low
csv.quote.char The character that is used to quote a field. This typically happens when the csv.separator.char character is within the data. int 34 low
csv.schema.from.header Flag to determine if the schema should be generated based on the header row. boolean false low
csv.schema.from.header.keys csv.schema.from.header.keys list [] low
csv.schema.name Fully qualified name for the schema. This setting is ignored if csv.schema is set. string "" low
csv.separator.char The character that seperates each field. Typically in a CSV this is a , character. A TSV would use \t. int 44 low
csv.skip.lines Number of lines to skip in the beginning of the file. int 0 low
csv.strict.quotes Sets the strict quotes setting - if true, characters outside the quotes are ignored. boolean false low
csv.verify.reader Flag to determine if the reader should be verified. boolean true low
file.minimum.age.ms The amount of time in milliseconds after the file was last written to before the file can be processed. long 0 [0,...,9223372036854775807] low
include.file.metadata Flag to determine if the metadata about the file should be included. boolean false low
processing.file.extension Before a file is processed, it is renamed to indicate that it is currently being processed. This setting is appended to the end of the file. string .PROCESSING ValidPattern{pattern=^.*..+$} low