Kafka::KafkaWriter ================================= Kafka Writer plugin for Bro. This plugin will connect to a Kafka server, and will send JSON-format logs to the topic specified. Aside from sending bro logs to Kafka, it will also add two fields to each JSON message, and rename 5 of the fields from the bro native names. The added fields are: a "sensor" name as specified in the config, used to identify which bro server originated this log. and a "type" field, populated with the log type (dns, ssl, conn, etc). The renamed fields are: "ts" will become "timestamp", "id.orig_h"becomes "source_ip", "id.orig_p" becomes "source_port", "id.resp_h" becomes "dest_ip", and "id.resp_p" becomes "dest_port". Users can override the sensor name as a standard redef in their config. Changing the type field names can be done by editing the init.bro script for the KafkaLogger (though I don't recommend it). I would like to make the field renaming configurable in the future, but had problems with the bro table structure in C++, so those are hard-coded in the C++ for the moment. (note: timestamps are sent as millisecond timestamps I.e., a UNIX-format timestamp that includees the milliseconds aka epoch * 1000) Configuration ================================ In your bro policy, include the logs-to-kafka bro script, and define the particular logs you want to send to bro, like this: @load Kafka/KafkaWriter/logs-to-kafka redef KafkaLogger::logs_to_send = set(HTTP::LOG, Conn::LOG, DNS::LOG, SMTP::LOG); see the Kafka init.bro file for the full list of logs you can flag to be sent to Kafka. (that file also contains the clean names that the log files will be called in the "type" field.) You can also redefine how the bro client will connect to kafka with the following variables: redef KafkaLogger::topic_name = "logs"; which Kafka topic to send logs to. All logs from bro will be sent to this topic. redef KafkaLogger::broker_name = "hostname:port,hostname:port"; the name of the kafka broker to send messages to. This is a comma-separated list of kafka brokers. format: hostname1:port,hostname2:port no spaces. redef KafkaLogger::sensor_name = "brosensor"; The name to add to each message to identify this sensor in the logs. It will be added to a "sensor" field in the submitted json. redef KafkaLogger::client_id = "bro"; This is used internally by Kafka to trace requests and errors. Kafka advises setting it to identify the application making the requests/produces. the log name will be appended to this in the Kafka logs. redef KafkaLogger::compression_codec = "0"; Kafka can compress messages on the wire. This specifies which compression codec to use. 0=none, 1=gzip, 2=snappy. redef KafkaLogger::max_batch_size = "1000"; redef KafkaLogger::max_batch_interval = "10000"; how often to send logs (number of messages, bytes, and seconds). If you use the Kerberised version of kafka, you'll need to switch this from PLAINTEXT to SASL_PLAINTEXT or SASL_SSL, make sure you've setup the keytab properly see https://github.com/edenhill/librdkafka/wiki/Using-SASL-with-librdkafka for more info on how to perform that, you will also need a version of librdkafka built with ssl and kerberos support. redef KafkaLogger::security_protocol = "SASL_PLAINTEXT" redef KafkaLogger::kerberos_service_name = "kafka"; Needs to be redifined to match the service name for Kafka. redef KafkaLogger::kerberos_keytab = "/etc/security/keytabs/bro.keytab"; Needs to be redefined if your keytab file is stored elsewhere. redef KafkaLogger::kerberos_principal = "kafka/kafkabroker@domain.com" Note: the <kafkabroker> must exactly match the hostname part of the broker principal. If after setting up you get an error about "no common mechs", you proably need to install additionnal modules, on Debian and derivatives ''' apt-get install libsasl2-modules-gssapi-heimda ''' Should do the trick. Lastly, if you are consuming the events with logstash, you will need to set redef KafkaLogger::logstash_style_timestamp = T; This will set quotes around the "timestamp" field. Logstash is apparently looking for the timestamp to be a string, even if you configure it to be looking for unix timestamp values. (BTW, you will want to configure logstash with a filter that says: filter { date { match => ["timestamp", "UNIX_MS"] } } } in order to get it to parse the timestamps properly. Requirements ================================ You will need to install the librdkafka library independently of this plugin. The system when building will attempt to find the librdkafka libraries, but if you put them somewhere unusual, you will need to tell configure where to find it with the --with-librdkafka option. Notes & thank yous =============================== Much of this plugin is based on earlier code from Kurt Grutzmacher who wrote a bro 2.2 logger for kafka. Also, many thanks for Robin Sommer for answering my beginner-level questions as I started this process. Lastly, this plugin will create a new connection from Bro to kafka for each log type to be sent to kafka. This may cause a lot of connections. At the moment, the plugin does not try to re-connect if it loses all connectivity to kafka, but the librdkafka library should handle failover between brokers transparently.