/daf-iotingestion

POC for Iot ingestion using Avro, Kafka and Spark Streaming

Primary LanguageScala

IotIngestion

POC for Iot ingestion using Avro, Kafka and Spark Streaming

Requirements

We provide a docker compose file for running the above services.

Transform all data into an internal format (i.e. DataPoint)

To be as efficient as possible, we use Apache Avro to serialize/deserialize data from and to Kafka. In particular, inspired by Osso-project, we defined a common schema, called DataPoint for any kind of input events. This schema is general enough to fit most usage data types (aka event logs, data from mobile devices, sensors, etc.).

A generic DataPoint is defined as follows:

  • Version (long) - The Osso event format version.
  • Event type ID (int) - A numeric event type.
  • Event ID (string) - An ID that uniquely identifies an event.
  • Timestamp (long) - A millisecond-accurate epoch timestamp indicating the moment the event occurred.
  • Location (string) - The location that generated the event.
  • Host (string) - The host or device that generated the event (e.g. http://opendata.5t.torino.it/get_fdt).
  • Service (string) - The service that generate the event (e.g. the uri who provides data such as ).
  • Body (byte array) - The body or “payload” of the event.
  • Tags (Map[String, String]) - the combination of a tag key-value (e.g. via=Roma, offset=500)
  • Values (Map[String, Double]) - actual measured values (e.g. free_parking=134.0)

Compile the code

1. Generate the DataPoint class

To generate a java class from an avro file, download avrohugger-tools-{version}.jar from here and exec:

java -jar avrohugger-tools_2.11-0.15.0-assembly.jar generate-specific datafile ./src/main/scala/it/teamDigitale/avro/DataPoint.avsc ./src/main/scala/

2. Create a jar file with external dependencies

Run the following command:

sbt clean compile package universal:package-bin

or the script compile_code.sh:

cd ./scripts
./compile_code.sh

Run the code

1. Run a kafka producer

Here we have implemented some example of a kafka producer which extract data from an rss feed.

The following command generate a kafka producer which extracts data from http://opendata.5t.torino.it/get_fdt:

java -Dconfig.file=./application.conf -cp "iotingestion_2.11-1.0.jar:./iotingestion-1.0/lib/*" it.teamDigitale.kafkaProducers.main.TorinoTrafficProducer

2. Run a kafka consumer

Here we have implemented some kafka consumer which stores data on HDFS and InfluxDB.

For running a kafka consumer (e.g. HDFSConsumerMain):

java -Dconfig.file=./application.conf -cp "iotingestion_2.11-1.0.jar:./iotingestion-1.0/lib/*" it.teamDigitale.consumers.sparkStreaming.main.HDFSConsumerMain

Grafana dashbBoard Examples

Analysis of the vehicular flow for Firenze

In the following, the vehicular flow of Firenze is analyzed for each available sensor. Data are extracted from Firenze open data website Image of Firenze

Analysis of average speed for Torino

In the following, the average speed of Torino is analyzed for each available sensor and grouped by streets. Data are extracted from Torino web service Image of Firenze