/KafkaLogger

Logging plugin to bro to send logs to a Kafka broker

Primary LanguageC++OtherNOASSERTION

Kafka::KafkaWriter
=================================

Kafka Writer plugin for Bro. This plugin will connect to a Kafka server, and will send JSON-format
logs to the topic specified. 

Aside from sending bro logs to Kafka, it will also add two fields to each JSON message, and rename
5 of the fields from the bro native names. 

The added fields are: 
	a "sensor" name as specified in the config, used to identify which bro server originated this log. 
	and a "type" field, populated with the log type (dns, ssl, conn, etc).
	
The renamed fields are: 
	"ts" will become "timestamp", 
	"id.orig_h"becomes "source_ip", 
	"id.orig_p" becomes "source_port", 
	"id.resp_h" becomes "dest_ip", and 
	"id.resp_p" becomes "dest_port". 
	
	
Users can override the sensor name as a standard redef in their config. Changing the type field names can
be done by editing the init.bro script for the KafkaLogger (though I don't recommend it).  I would like 
to make the field renaming configurable in the future, but had problems with the bro table structure 
in C++, so those are hard-coded in the C++ for the moment.

(note: timestamps are sent as millisecond timestamps I.e., a UNIX-format timestamp that includees
the milliseconds aka epoch * 1000)

Configuration
================================
In your bro policy, include the logs-to-kafka bro script, and define the particular logs you want 
to send to bro, like this:

	@load Kafka/KafkaWriter/logs-to-kafka
	redef KafkaLogger::logs_to_send = set(HTTP::LOG, Conn::LOG, DNS::LOG, SMTP::LOG);
	
see the Kafka init.bro file for the full list of logs you can flag to be sent to Kafka. (that file
also contains the clean names that the log files will be called in the "type" field.)
	
You can also redefine how the bro client will connect to kafka with the following variables:

	redef KafkaLogger::topic_name = "logs"; 
		which Kafka topic to send logs to. All logs from bro will be sent to this topic.
	
	
	redef KafkaLogger::broker_name = "hostname:port,hostname:port";
		the name of the kafka broker to send messages to. 
		This is a comma-separated list of kafka brokers.
			format: hostname1:port,hostname2:port
		no spaces.
	
	
	redef KafkaLogger::sensor_name = "brosensor";
		The name to add to each message to identify
		this sensor in the logs. It will be added to a "sensor"
		field in the submitted json.
	
	 
	redef KafkaLogger::client_id = "bro"; 
		This is used internally by Kafka to trace requests and
		errors. Kafka advises setting it to identify the 
		application making the requests/produces. the log name
		will be appended to this in the Kafka logs.
	
	
	redef KafkaLogger::compression_codec = "0"; 
		Kafka can compress messages on the wire. This specifies
		which compression codec to use. 0=none, 1=gzip, 2=snappy.
	
	
	redef KafkaLogger::max_batch_size = "1000"; 
	redef KafkaLogger::max_batch_interval = "10000";
		how often to send logs (number of messages, bytes, and seconds).

If you use the Kerberised version of kafka, you'll need to switch this from PLAINTEXT to SASL_PLAINTEXT or SASL_SSL, 
make sure you've setup the keytab properly see 
https://github.com/edenhill/librdkafka/wiki/Using-SASL-with-librdkafka for more info on how to perform
that, you will also need a version of librdkafka built with ssl and kerberos support.
	
	redef KafkaLogger::security_protocol = "SASL_PLAINTEXT"

	redef KafkaLogger::kerberos_service_name = "kafka";
		Needs to be redifined to match the service name for Kafka.

	redef KafkaLogger::kerberos_keytab = "/etc/security/keytabs/bro.keytab";
		Needs to be redefined if your keytab file is stored elsewhere.
		
	redef KafkaLogger::kerberos_principal = "kafka/kafkabroker@domain.com"
		Note: the <kafkabroker> must exactly match the hostname part of the broker principal. 

If after setting up you get an error about "no common mechs", you proably need to install additionnal modules, on Debian and derivatives
'''
apt-get install libsasl2-modules-gssapi-heimda
'''
Should do the trick.

Lastly, if you are consuming the events with logstash, you will need to set
	redef KafkaLogger::logstash_style_timestamp = T;
		This will set quotes around the "timestamp" field. Logstash is apparently
		looking for the timestamp to be a string, even if you configure it to be
		looking for unix timestamp values. (BTW, you will want to configure
		logstash with a filter that says: 
			filter { date { match => ["timestamp", "UNIX_MS"] } } }
		in order to get it to parse the timestamps properly.

Requirements
================================
You will need to install the librdkafka library independently of this plugin. The system when building will 
attempt to find the librdkafka libraries, but if  you put them somewhere unusual, you will need to tell
configure where to find it with the --with-librdkafka option.


Notes & thank yous
===============================
Much of this plugin is based on earlier code from Kurt Grutzmacher who wrote a bro 2.2 logger for kafka.
Also, many thanks for Robin Sommer for answering my beginner-level questions as I started this process.

Lastly, this plugin will create a new connection from Bro to kafka for each log type to be sent to 
kafka. This may cause a lot of connections. At the moment, the plugin does not try to re-connect if
it loses all connectivity to kafka, but the librdkafka library should handle failover between brokers
transparently.