Overview
This Kafka Connect connector provides the capability to watch a directory for files and read the data as new files are written to the input directory. The RecordProcessor implementation can be overridden so any file type can be supported. Currently there is support for delimited files and reading a file line by line.
The CSVRecordProcessor supports reading CSV or TSV files. It can convert a CSV on the fly to the strongly typed Kafka Connect data types. It currently has support for all of the schema types and logical types that are supported in Kafka 0.10.x. If you couple this with the Avro converter and Schema Registry by Confluent, you will be able to process csv files to strongly typed Avro data in real time.
The LineRecordProcessor supports reading a file line by line and emitting the line.
Building on you workstation
git@github.com:jcustenborder/kafka-connect-spooldir.git
cd kafka-connect-spooldir
mvn clean package
Running on your workstation
Schema Configuration
This connector allows you to either infer a schema with nullable strings from the header row, or you can specify the schema in json format.
To use the automatic schema generation set csv.first.row.as.header=true
, csv.schema.from.header=true
, csv.schema.from.header.keys=key1,key2
.
To manually define the schema set csv.schema
to a json representation of the schema. The example below works is for the mock data in the test class.
Configuration options
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
error.path | The directory to place files in which have error(s). This directory must exist and be writable by the user running Kafka Connect. | string | high | ||
finished.path | The directory to place files that have been successfully processed. This directory must exist and be writable by the user running Kafka Connect. | string | high | ||
input.file.pattern | Regular expression to check input file names against. This expression must match the entire filename. The equivalent of Matcher.matches(). | string | high | ||
input.path | The directory to read files that will be processed. This directory must exist and be writable by the user running Kafka Connect. | string | high | ||
record.processor.class | Class that implements RecordProcessor. This class is used to process data as it arrives. | class | high | ||
topic | The Kafka topic to write the data to. | string | high | ||
halt.on.error | Should the task halt when it encounters an error or continue to the next file. | boolean | true | high | |
csv.first.row.as.header | Flag to indicate if the fist row of data contains the header of the file. | boolean | false | medium | |
csv.schema | Schema representation in json. | string | "" | medium | |
batch.size | The number of records that should be returned with each batch. | int | 1000 | low | |
csv.case.sensitive.field.names | Flag to determine if the field names in the header row should be treated as case sensitive. | boolean | false | low | |
csv.escape.char | Escape character. | int | 92 | low | |
csv.file.charset | Character set to read wth file with. | string | UTF-8 | low | |
csv.ignore.leading.whitespace | Sets the ignore leading whitespace setting - if true, white space in front of a quote in a field is ignored. | boolean | true | low | |
csv.ignore.quotations | Sets the ignore quotations mode - if true, quotations are ignored. | boolean | false | low | |
csv.keep.carriage.return | Flag to determine if the carriage return at the end of the line should be maintained. | boolean | false | low | |
csv.null.field.indicator | Indicator to determine how the CSV Reader can determine if a field is null. Valid values are EMPTY_SEPARATORS, EMPTY_QUOTES, BOTH, NEITHER. For more information see http://opencsv.sourceforge.net/apidocs/com/opencsv/enums/CSVReaderNullFieldIndicator.html. | string | NEITHER | ValidEnum{enumClass=CSVReaderNullFieldIndicator, validEnums=[NEITHER, EMPTY_SEPARATORS, EMPTY_QUOTES, BOTH]} | low |
csv.parser.timestamp.date.formats | The date formats that are expected in the file. This is a list of strings that will be used to parse the date fields in order. The most accurate date format should be the first in the list. Take a look at the Java documentation for more info. https://docs.oracle.com/javase/6/docs/api/java/text/SimpleDateFormat.html | list | [yyyy-MM-dd' 'HH:mm:ss] | low | |
csv.parser.timestamp.timezone | The timezone that all of the dates will be parsed with. | string | UTC | low | |
csv.quote.char | The character that is used to quote a field. This typically happens when the csv.separator.char character is within the data. | int | 34 | low | |
csv.schema.from.header | Flag to determine if the schema should be generated based on the header row. | boolean | false | low | |
csv.schema.from.header.keys | csv.schema.from.header.keys | list | [] | low | |
csv.schema.name | Fully qualified name for the schema. This setting is ignored if csv.schema is set. | string | "" | low | |
csv.separator.char | The character that seperates each field. Typically in a CSV this is a , character. A TSV would use \t. | int | 44 | low | |
csv.skip.lines | Number of lines to skip in the beginning of the file. | int | 0 | low | |
csv.strict.quotes | Sets the strict quotes setting - if true, characters outside the quotes are ignored. | boolean | false | low | |
csv.verify.reader | Flag to determine if the reader should be verified. | boolean | true | low | |
file.minimum.age.ms | The amount of time in milliseconds after the file was last written to before the file can be processed. | long | 0 | [0,...,9223372036854775807] | low |
include.file.metadata | Flag to determine if the metadata about the file should be included. | boolean | false | low | |
processing.file.extension | Before a file is processed, it is renamed to indicate that it is currently being processed. This setting is appended to the end of the file. | string | .PROCESSING | ValidPattern{pattern=^.*..+$} | low |