/opentsdb-rpc-kafka

A set of OpenTSDB plugins for consuming from Apache Kafka

Primary LanguageJavaGNU Lesser General Public License v2.1LGPL-2.1

   ___                 _____ ____  ____  ____
  / _ \ _ __   ___ _ _|_   _/ ___||  _ \| __ )
 | | | | '_ \ / _ \ '_ \| | \___ \| | | |  _ \
 | |_| | |_) |  __/ | | | |  ___) | |_| | |_) |
  \___/| .__/ \___|_| |_|_| |____/|____/|____/
       |_|    The modern time series database.

Build Status Coverage Status

Kafka RPC Plugin

This plugin allows OpenTSDB to consume messages from a Kafka cluster and write them directly to storage, bypassing the Telnet style or HTTP APIs. It includes a Storage Exception Handler plugin that will post messages back to a Kafka queue if writing to storage fails.

Installation

  1. Download the source code and run mvn package to create the jar in the target/ directory. Copy this file to your OpenTSDB plugin directory as defined in the opentsdb config via tsd.core.plugin_path.
  2. Setup the appropriate Kafka topics and partitions. More on that later.
  3. Add configuration settings to your opentsdb.conf file as described later on.
  4. Restart the TSD and make sure the plugin was loaded and associated with the proper ID. E.g. look in the logs for lines like:
2017-07-10 23:08:58,264 INFO  [main] RpcManager: Successfully initialized plugin [net.opentsdb.tsd.KafkaRpcPlugin] version: 2.4.0
2017-07-10 23:08:57,790 INFO  [main] TSDB: Successfully initialized storage exception handler plugin [net.opentsdb.tsd.KafkaStorageExceptionHandler] version: 2.4.0

Usage

The plugin expects (currently) JSON formatted messages flowing from the Kafka brokers. Each JSON message must include a type field with the value being one of the following:

  • Metric A single numeric measurement.
  • Aggregate A single numeric measurement that may be a pre-aggregate, a rolled up data point or both.
  • Histogram A single histogram measurement.

Each message is similar to the HTTP JSON messages in the OpenTSDB API with the addition of the type field so that the JSON deserializer can figure out what the message contains.

###Metric

The metric message appears as follows:

{
	"type": "Metric",
	"metric": "sys.cpu.user",
	"tags": {
		"host": "web01"
	},
	"timestamp": 1492641000,
	"value": 42
}

For field information, see [/api/put] (http://opentsdb.net/docs/build/html/api_http/put.html).

###Aggregate

Aggregate messages are the same as those documented in [/api/rollup] (http://opentsdb.net/docs/build/html/api_http/rollup.html).

###Histogram

Histogram messages are documented at [/api/histogram] (http://opentsdb.net/docs/build/html/api_http/histogram.html).

TODO We would also like to support more streamlined formats rather than JSON, work is underway for those.

Configuration

The following properties can be stored in the opentsdb.conf file:

Property Type Required Description Default Example
tsd.rpc.plugins String Required The full class name of the plugin. This must be net.opentsdb.tsd.KafkaRpcPlugin net.opentsdb.tsd.KafkaRpcPlugin
KafkaRpcPlugin.kafka.zookeeper.connect String Required The comma separated list of zookeeper hosts and ports used by the Kafka cluster. localhost:2181
KafkaRpcPlugin.groups String Required A comma separated list of one or more consumer group names. TsdbConsumer,TsdbRequeueConsumer
KafkaRpcPlugin.<GROUPNAME>.topics String Required A comma separated list of one or more topics for the <GROUPNAME> to consume from. TSDB_1,TSDB_2
KafkaRpcPlugin.<GROUPNAME>.consumerType String Required The type of messages written to the queue. TODO. For now, leave it as raw raw
KafkaRpcPlugin.<GROUPNAME>.rate Integer Required How many messages per second to throttle the total of consumer threads at for the consumer group 250000
KafkaRpcPlugin.<GROUPNAME>.threads Integer Required The number of consumer threads to create per group 4
tsd.http.rpc.plugins String Optional A comma separated list of HTTP RPC plugins to load. Included with this package is a plugin that allows for fetching stats from the Kafka plugin as well as viewing or modifying the write rate during runtime. net.opentsdb.tsd.KafkaHttpRpcPlugin
tsd.core.storage_exception_handler.enable Boolean Optional Whether or not to enable the storage exception handler plugin. false true
tsd.core.storage_exception_handler.plugin String Optional The full class of the storage exception handler plugin. net.opentsdb.tsd.KafkaStorageExceptionHandler
KafkaRpcPlugin.kafka.metadata.broker.list String Optional The comma separated list of Kafka brokers and ports used to write messages to for the storage exception handler plugin localhost:9092
KafkaRpcPlugin.seh.topic.default String Optional The topic used to write messages to for the storage exception handler. TSDB_Requeue

Note the KafkaRpcPlugin.groups and <GROUP_NAME> entries above. Kafka consumers belong to a particular group. The Kafka RPC plugin can launch multiple groups consuming from multiple topics so that OpenTSDB messages can be organized by type or source for more efficient control over rate limits and priorities. When setting the KafkaRpcPlugin.groups value, make sure you have a complete set of KafkaRpcPlugin.<GROUP_NAME>.* parameters per group or initialization will fail.