LogIsland is an event mining platform based on Spark and Kafka to handle a huge amount of log files.
You can start right now to play with LogIsland through the Docker image, by following the getting started guide
The documentation also explains how to build the source code in order to implement your own plugins.
Once you know how to run and build your own parsers and processors, you'll want to deploy and scale them.
- Raw log files are sent to Kafka topics by a NIFI / Logstash / Flume / Collectd (or whatever) agent
- Logs in Kafka topic are translated into Events and pushed back to another Kafka topic by a Spark streaming job
- Events in Kafka topic are sent to Elasticsearch (or Solr or whatever backend) for online analytics (Kibana or Banana) by a Spark streaming job
- Log topics can also dumped to HDFS (master dataset) for offline analytics
- Event component do some time window based analytics on events to build new events
A LogIsland stream processing flow is made of a bunch of components. At least one streaming engine and 1 or more stream processors. You set them up by a YAML configuration file. Please note that events are serialized against an Avro schema while transiting through any Kafka topic. Every spark.streaming.batchDuration
(time window), each processor will handle its bunch of Events to eventually generate some new events to the output topic.
The following conf/configuration-template.yml
contains a sample of processor definitions.
version: 0.1
documentation: LogIsland analytics main config file. Put here every engine or component config
components:
# Main event streaming engine
- component: com.hurence.logisland.engine.SparkStreamProcessingEngine
type: engine
version: 0.1.0
documentation: Main Logisland job entry point
configuration:
spark.master: local[8]
spark.executorMemory: 4g
spark.checkpointingDirectory: file:///tmp
spark.appName: My first stream component
spark.streaming.batchDuration: 2000
spark.serializer: org.apache.spark.serializer.KryoSerializer
spark.streaming.backpressure.enabled: true
spark.streaming.unpersist: false
spark.streaming.blockInterval: 350
spark.streaming.kafka.maxRatePerPartition: 500
spark.streaming.timeout: 20000
spark.ui.port: 4050
kafka.metadata.broker.list: localhost:9092
kafka.zookeeper.quorum: localhost:2181
# A Debug component that only logs what it reads
- component: com.hurence.logisland.processor.debug.EventDebuggerProcessor
type: processor
version: 0.1.0
documentation: a processor that trace the processed events
configuration:
kafka.input.topics: logisland-mock-in
kafka.output.topics: none
kafka.error.topics: none
avro.input.schema: |
{"version":1,"type":"record","namespace":"com.hurence.logisland","name":"Event","fields":[{"name":"_type","type":"string"},{"name":"_id","type":"string"},{"name":"timestamp","type":"long"},{"name":"method","type":"string"},{"name":"ipSource","type":"string"},{"name":"ipTarget","type":"string"},{"name":"urlScheme","type":"string"},{"name":"urlHost","type":"string"},{"name":"urlPort","type":"string"},{"name":"urlPath","type":"string"},{"name":"requestSize","type":"int"},{"name":"responseSize","type":"int"},{"name":"isOutsideOfficeHours","type":"boolean"},{"name":"isHostBlacklisted","type":"boolean"},{"name":"tags","type":{"type":"array","items":"string"}}]}
avro.output.schema: |
{"version":1,"type":"record","namespace":"com.hurence.logisland","name":"Event","fields":[{"name":"_type","type":"string"},{"name":"_id","type":"string"},{"name":"timestamp","type":"long"},{"name":"method","type":"string"},{"name":"ipSource","type":"string"},{"name":"ipTarget","type":"string"},{"name":"urlScheme","type":"string"},{"name":"urlHost","type":"string"},{"name":"urlPort","type":"string"},{"name":"urlPath","type":"string"},{"name":"requestSize","type":"int"},{"name":"responseSize","type":"int"},{"name":"isOutsideOfficeHours","type":"boolean"},{"name":"isHostBlacklisted","type":"boolean"},{"name":"tags","type":{"type":"array","items":"string"}}]}
# Generate random events based on an avro schema
- component: com.hurence.logisland.processor.randomgenerator.RandomEventGeneratorProcessor
type: processor
version: 0.1.0
documentation: a processor that produces random events
configuration:
kafka.input.topics: none
kafka.output.topics: logisland-mock-in
kafka.error.topics: logisland-error
min.events.count: 5
max.events.count: 100
avro.input.schema: |
{"version":1,"type":"record","namespace":"com.hurence.logisland","name":"Event","fields":[{"name":"_type","type":"string"},{"name":"_id","type":"string"},{"name":"timestamp","type":"long"},{"name":"method","type":"string"},{"name":"ipSource","type":"string"},{"name":"ipTarget","type":"string"},{"name":"urlScheme","type":"string"},{"name":"urlHost","type":"string"},{"name":"urlPort","type":"string"},{"name":"urlPath","type":"string"},{"name":"requestSize","type":"int"},{"name":"responseSize","type":"int"},{"name":"isOutsideOfficeHours","type":"boolean"},{"name":"isHostBlacklisted","type":"boolean"},{"name":"tags","type":{"type":"array","items":"string"}}]}
avro.output.schema: |
{"version":1,"type":"record","namespace":"com.hurence.logisland","name":"Event","fields":[{"name":"_type","type":"string"},{"name":"_id","type":"string"},{"name":"timestamp","type":"long"},{"name":"method","type":"string"},{"name":"ipSource","type":"string"},{"name":"ipTarget","type":"string"},{"name":"urlScheme","type":"string"},{"name":"urlHost","type":"string"},{"name":"urlPort","type":"string"},{"name":"urlPath","type":"string"},{"name":"requestSize","type":"int"},{"name":"responseSize","type":"int"},{"name":"isOutsideOfficeHours","type":"boolean"},{"name":"isHostBlacklisted","type":"boolean"},{"name":"tags","type":{"type":"array","items":"string"}}]}
One you've edited your configuration file, you can submit it to execution engine with the following cmd :
bin/process-stream.sh -conf conf/configuration-template.yml
Logisland processors are hosted in some plugins, you can create your own with a maven archetype.
git clone git@github.com:Hurence/log-island.git
cd logisland-0.9.4/logisland-plugins
mvn archetype:generate -DarchetypeGroupId=com.hurence.logisland -DarchetypeArtifactId=logisland-plugin-archetype -DarchetypeVersion=0.9.4 -DlogislandVersion=0.9.4
Define value for property 'groupId': : com.hurence.logisland
Define value for property 'artifactId': : logisland-sample-plugin
Define value for property 'version': 1.0-SNAPSHOT: : 0.1
Define value for property 'artifactBaseName': : sample
Define value for property 'package': com.hurence.logisland.sample: :
[INFO] Using property: logislandVersion = 0.9.4