Kafka Connect OSS
Now, you can use this connector as a sink, to upload data from kafka topics to OSS in Json, Avro or Parquet format.
This connector periodically polls data from Kafka and in turn uploads it to OSS. You can configure a partitioner to split data into chunks, and each chunk of data is represented as an OSS object.The key name encodes the topic, the Kafka partition, and the start offset of this data chunk. If no partitioner is specified in the configuration, the default partitioner which preserves Kafka partitioning is used. The size of each data chunk is determined by the number of records written to OSS and by schema compatibility.
License
It is under Apache License, Version 2.0, meaning you are pretty much free to use it however you want in whatever way.
Limitations
Kafka 0.11.0.0 or above(0.11.x, 1.x, 2.x) is required. Other versions(0.10.x or below) will not be supported now.
Install OSS Connector
Compile and Deployment
Execute commands below:
git clone https://github.com/aliyun/kafka-connect-oss.git
cd kafka-connect-oss
mvn clean install -DskipTests
The generated jar library(kafka-connect-oss-5.2.0.jar) will be in the target directory, copy it to $KAFKA_HOME/libs of each node.
Setup Hadoop configuration
This connector depends on hadoop oss filesystem interface to upload data to OSS, so we should add core-site.xml configuration file to $KAFKA_HOME/config directory:
[root@master ~]# cd $KAFKA_HOME
[root@master kafka_2.11-0.11.0.0]# ls config
connect-console-sink.properties connect-file-sink.properties connect-standalone.properties log4j.properties tools-log4j.properties
connect-console-source.properties connect-file-source.properties consumer.properties producer.properties zookeeper.properties
connect-distributed.properties connect-log4j.properties core-site.xml server.properties
Then, we should add $KAFKA_HOME/config to CLASSPATH by changing $KAFKA_HOME/bin/kafka-run-class.sh before we start kafka connector cluster.
Add this command to $KAFKA_HOME/bin/kafka-run-class.sh.
[root@master kafka_2.11-0.11.0.0]# vim $KAFKA_HOME/bin/kafka-run-class.sh
CLASSPATH=$CLASSPATH:$base_dir/config
We can see the diff(bin/kafka-run-class.sh.bak is original file)
[root@master kafka_2.11-0.11.0.0]# diff -y bin/kafka-run-class.sh bin/kafka-run-class.sh.bak
CLASSPATH=$CLASSPATH:$base_dir/config <
# Launch mode # Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then if [ "x$DAEMON_MODE" = "xtrue" ]; then
nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $K nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $K
else else
exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KA exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KA
fi fi
You should add configurations below to core-site.xml
configuration | value | comments |
---|---|---|
fs.oss.endpoint | e.g. oss-cn-zhangjiakou-internal.aliyuncs.com | endpoint to connect to, endpoints |
fs.oss.accessKeyId | access key id | |
fs.oss.accessKeySecret | access key secret | |
fs.oss.impl | org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem | hadoop oss filesystem implementation |
fs.oss.buffer.dir | /tmp/oss | Comma separated list of directories to buffer OSS data before uploading to Aliyun OSS |
fs.oss.connection.secure.enabled | false | Connect to oss over ssl or not, true by default |
fs.oss.connection.maximum | 2048 | Number of simultaneous connections to oss |
You can find the details here
Setup Kafka connector
You should setup kafka connector cluster, please refer to official documentation here: https://docs.confluent.io/current/connect/userguide.html#distributed-worker-configuration.
You should configure bootstrap.servers
and group.id
in $KAFKA_HOME/config/connect-distributed.properties of each node.
Each node in kafka connector cluster should have the same group.id
, and then execute command below in each node:
cd $KAFKA_HOME && nohup bin/connect-distributed.sh config/connect-distributed.properties &
Other configurations
In order to upload data to OSS, you should add oss.bucket
configuration when you post connector tasks(see Quick Start below).
In order to use ParquetJsonFormat, you should add oss.parquet.protobuf.schema.class
configuration when you post connector tasks(see Quick Start below).
oss.parquet.protobuf.schema.class
is a list, the first value is the kafka topic name, and the second value is the schema class of the kafka topic.
Documentation
Formats supported
In order to use these formats, you should configure value.converter
in $KAFKA_HOME/config/connect-distributed.properties and format.class
when you post connector tasks to kafka connect cluster.
format | format.class | value.converter |
---|---|---|
Byte array | com.aliyun.oss.connect.kafka.format.bytearray.ByteArrayFormat | org.apache.kafka.connect.converters.ByteArrayConverter |
Avro | com.aliyun.oss.connect.kafka.format.avro.AvroFormat | io.confluent.connect.avro.AvroConverter |
Json | com.aliyun.oss.connect.kafka.format.json.JsonFormat | org.apache.kafka.connect.json.JsonConverter |
Avro to Parquet | com.aliyun.oss.connect.kafka.format.parquet.ParquetAvroFormat | io.confluent.connect.avro.AvroConverter |
Json to Parquet | com.aliyun.oss.connect.kafka.format.parquet.ParquetJsonFormat | org.apache.kafka.connect.storage.StringConverter |
Details of ParquetJsonFormat
In order to convert json format to parquet format, we need schema of json input. So you should define your schema in google protocol buffer language.
We have an example in src/main/proto/parquet.proto
There are two ways to provide schema classes:
- You can simply put your schema proto file under src/main/proto/ and compile with this repo(Recommended).
- You can compile proto file by yourself, package it and put the final jar in CLASSPATH($KAFKA_HOME/libs)
Exactly-once delivery
The OSS connector is able to provide exactly-once semantics to consumers of the objects it exports to OSS, under the condition that the connector is supplied with a deterministic partitioner.
Currently, out of the available partitioners, the default and field partitioners are always deterministic. TimeBasedPartitioner
can be deterministic with some configurations, discussed below. This implies that, when any of these partitioners is used, splitting of files always happens at the same offsets for a given set of Kafka records. These partitioners take into account flush.size
and schema.compatibility
to decide when to roll and save a new file to OSS. The connector always delivers files in OSS that contain the same records, even under the presence of failures. If a connector task fails before an upload completes, the file does not become visible to OSS. If, on the other hand, a failure occurs after the upload has completed but before the corresponding offset is committed to Kafka by the connector, then a re-upload will take place. However, such a re-upload is transparent to the user of the OSS bucket, who at any time will have access to the same records made eventually available by successful uploads to OSS.
To guarantee exactly-once semantics with the TimeBasedPartitioner
, the connector must be configured to use a deterministic implementation of TimestampExtractor
and a deterministic rotation strategy. The deterministic timestamp extractors are Kafka records (timestamp.extractor=Record
) or record fields (timestamp.extractor=RecordField
). The deterministic rotation strategy configuration is rotate.interval.ms
(setting rotate.schedule.interval.ms
is non deterministic and will invalidate exactly-once guarantees).
Schema Evolution
The OSS connector supports schema evolution and reacts to schema changes of data according to the schema.compatibility
configuration.
-
NO Compatibility: Default value. This case will ensure that each file written to OSS has the proper schema. When the connector observes a schema change in data, it commits the current set of files for the affected topic partitions and writes the data with new schema in new files
-
BACKWARD Compatibility: If a schema is evolved in a backward compatible way, we can always use the latest schema to query all the data uniformly. For example, removing fields is backward compatible change to a schema, since when we encounter records written with the old schema that contain these fields we can just ignore them. Adding a field with a default value is also backward compatible.
If
BACKWARD
is specified in theschema.compatibility
, the connector keeps track of the latest schema used in writing data to OSS, and if a data record with a schema version larger than current latest schema arrives, the connector commits the current set of files and writes the data record with new schema to new files. For data records arriving at a later time with schema of an earlier version, the connector projects the data record to the latest schema before writing to the same set of files to OSS. -
FORWARD Compatibility: If a schema is evolved in a forward compatible way, we can always use the oldest schema to query all the data uniformly. Removing a field that had a default value is forward compatible, since the old schema will use the default value when the field is missing.
If
FORWARD
is specified in theschema.compatibility
, the connector projects the data to the oldest schema before writing to the same set of files to OSS. -
Full Compatibility: Full compatibility means that old data can be read with the new schema and new data can also be read with the old schema.
If
FULL
is specified in theschema.compatibility
, the connector performs the same action asBACKWARD
.
Schema evolution in the OSS connector works in the same way as in the HDFS connector
Pluggable Partitioner
The connector comes out of the box with partitioners that support default partitioning based on Kafka partitions, field partitioning, and time-based partitioning in days or hours. You may implement your own partitioners by extending the Partitioner
class. Additionally, you can customize time based partitioning by extending the TimeBasedPartitioner
class.
Quick start(Using ParquetJsonFormat as an example)
First, we should create a topic to test.
[root@master ~]# cd $KAFKA_HOME
[root@master kafka_2.11-0.11.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 10 --topic test
Assume we have 4 nodes cluster(192.168.0.172, 192.168.0.173, 192.168.0.174, 192.168.0.175), and we post connector task:
curl -i -X POST \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
-d \ '{
"name":"oss-sink",
"config": {
"name":"oss-sink",
"topics":"test",
"oss.parquet.protobuf.schema.class":"test,com.aliyun.oss.connect.kafka.format.parquet.Parquet$Demo",
"connector.class":"com.aliyun.oss.connect.kafka.OSSSinkConnector",
"format.class":"com.aliyun.oss.connect.kafka.format.parquet.ParquetJsonFormat",
"flush.size":"10000",
"tasks.max":"4",
"storage.class":"com.aliyun.oss.connect.kafka.storage.OSSStorage",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"timestamp.extractor":"Record",
"oss.bucket":"hadoop-oss-test",
"partition.duration.ms":"10000",
"path.format":"YYYY-MM-dd-HH",
"locale":"US",
"timezone":"Asia/Shanghai",
"rotate.interval.ms":"30000"
}}' \
'http://slave01:8083/connectors'
Test schema class(com.aliyun.oss.connect.kafka.format.parquet.Parquet$Demo) has already been compiled in the jar library(kafka-connect-oss-5.2.0.jar)
HTTP/1.1 201 Created
Date: Wed, 12 Jun 2019 16:40:19 GMT
Location: http://slave01:8083/connectors/oss-sink
Content-Type: application/json
Content-Length: 836
Server: Jetty(9.2.15.v20160210)
{
"name": "oss-sink",
"config": {
"name": "oss-sink",
"topics": "test",
"oss.parquet.protobuf.schema.class": "test,com.aliyun.oss.connect.kafka.format.parquet.Parquet$Demo",
"connector.class": "com.aliyun.oss.connect.kafka.OSSSinkConnector",
"format.class": "com.aliyun.oss.connect.kafka.format.parquet.ParquetJsonFormat",
"flush.size": "10000",
"tasks.max": "4",
"storage.class": "com.aliyun.oss.connect.kafka.storage.OSSStorage",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"timestamp.extractor": "Record",
"oss.bucket": "hadoop-oss-test",
"partition.duration.ms": "10000",
"path.format": "YYYY-MM-dd-HH",
"locale": "US",
"timezone": "Asia/Shanghai",
"rotate.interval.ms": "30000"
},
"tasks": [{
"connector": "oss-sink",
"task": 0
}, {
"connector": "oss-sink",
"task": 1
}, {
"connector": "oss-sink",
"task": 2
}, {
"connector": "oss-sink",
"task": 3
}]
}
Then, we can check status of this connector task by:
curl -i -X GET \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
'http://slave01:8083/connectors/oss-sink/status'
HTTP/1.1 200 OK
Date: Wed, 12 Jun 2019 16:41:18 GMT
Content-Type: application/json
Content-Length: 334
Server: Jetty(9.2.15.v20160210)
{
"name": "oss-sink",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.0.174:8083"
},
"tasks": [{
"state": "RUNNING",
"id": 0,
"worker_id": "192.168.0.173:8083"
}, {
"state": "RUNNING",
"id": 1,
"worker_id": "192.168.0.172:8083"
}, {
"state": "RUNNING",
"id": 2,
"worker_id": "192.168.0.175:8083"
}, {
"state": "RUNNING",
"id": 3,
"worker_id": "192.168.0.174:8083"
}]
}
Then, we can write json data to kafka topic and see results from logs(You can also check from OSS console):
[2019-06-12 16:50:11,806] INFO Start to commit file oss://hadoop-oss-test/topics/test/2019-06-12-16/test+0+0000154000.parquet (com.aliyun.oss.connect.kafka.format.parquet.Parqu
etJsonRecordWriterProvider:131)
[2019-06-12 16:50:11,806] INFO Flushing mem columnStore to file. allocated memory: 1299286 (org.apache.parquet.hadoop.InternalParquetRecordWriter:165)
[2019-06-12 16:50:11,877] INFO File oss://hadoop-oss-test/topics/test/2019-06-12-16/test+0+0000154000.parquet committed (com.aliyun.oss.connect.kafka.format.parquet.ParquetJson
RecordWriterProvider:133)