
Open Network Insight Ingest

Open-Network-Insight Ingest Framework

Ingest data is captured or transferred into the Hadoop cluster, where they are transformed and loaded into solution data stores.

Configure Kafka

Adding Kafka Service:

Ingest framework needs Kafka to work in real-time streaming. Add Kafka service using Cloudera Manager. If you are using a Cloudera Manager version < 5.4.1 you will need to add the kafka parcel manually.

Ingest module uses a default configuration for the message size (999999 bytes), if you modify this size in the ingest configuration file you will need to modify the following configuration properties in kafka:

  • message.max.bytes
  • replica.fetch.max.bytes

Spark-Streaming Kafka support.

Download the following jar file: spark-streaming-kafka-0-8-assembly_2.11. This jar adds support for Spark Streaming + Kafka and needs to be downloaded on the following path : oni-ingest/oni (with the same name)

Getting Started

Required Roles

The following roles are required in all the nodes where the Ingest Framework will be running.

Get the code:

 git clone https://github.com/Open-Network-Insight/oni-ingest.git

Ingest Configuration:

The file ingest_conf.json contains all the required configuration to start the ingest module

  • dbname: Name of HIVE database where all the ingested data will be stored in avro-parquet format.
  • hdfs_app_path: Application path in HDFS where the pipelines will be stored (i.e /user/application_user/).
  • kafka: Kafka and Zookeeper server information required to create/listen topics and partitions.
  • pipelines: In this section you can add multiple configurations for either the same pipeline or different pipelines. The configuration name must be lowercase without spaces (i.e. flow_internals).

Configuration example:

  "dbname" : "database name",
  "hdfs_app_path" : "hdfs application path",
        "kafka_server":"kafka ip",
        "kafka_port":"kafka port",
        "zookeper_server":"zk ip",
        "zookeper_port":"zk port",
          "process_opt":"-E separator=, -E header=y -E occurrence=f -T fields -e frame.time -e frame.time_epoch -e frame.len -e ip.src -e ip.dst -e dns.resp.name -e dns.resp.type -e dns.resp.class -e dns.flags.rcode -e dns.a 'dns.flags.response == 1'"

Starting the Ingest

Running in a Standalone Mode:

bash start_standalone_ingest.sh "pipeline_configuration" "num of workers"

Following the previous configuration example starting ingest module in a stand alone mode will look like:

bash start_standalone_ingest.sh flow_internals 4

Running in a Cluster Mode:

Running Master: Master needs to be run in the same server where the collector path is.

python master_collector.py -t "pipeline_configuration" -w "number of workers"

Running Workers: Worker needs to be executed in a server where the required processing program installed (i.e. nfdump), also the worker needs to be identified with a specific id, this id needs to start with 0.


  1. worker_0, id = 0
  2. worker_1 , id = 1

This "id" is required to attach the worker with the kafka partition.

python worker.py -t "pipeline_configuration" -i "id of the worker (starts with 0)" --topic "my_topic"