POC for Iot ingestion using Avro, Kafka and Spark Streaming
We provide a docker compose file for running the above services.
To be as efficient as possible, we use Apache Avro to serialize/deserialize data from and to Kafka. In particular, inspired by Osso-project, we defined a common schema, called DataPoint for any kind of input events. This schema is general enough to fit most usage data types (aka event logs, data from mobile devices, sensors, etc.).
A generic DataPoint is defined as follows:
- Version (long) - The Osso event format version.
- Event type ID (int) - A numeric event type.
- Event ID (string) - An ID that uniquely identifies an event.
- Timestamp (long) - A millisecond-accurate epoch timestamp indicating the moment the event occurred.
- Location (string) - The location that generated the event.
- Host (string) - The host or device that generated the event (e.g. http://opendata.5t.torino.it/get_fdt).
- Service (string) - The service that generate the event (e.g. the uri who provides data such as ).
- Body (byte array) - The body or “payload” of the event.
- Tags (Map[String, String]) - the combination of a tag key-value (e.g. via=Roma, offset=500)
- Values (Map[String, Double]) - actual measured values (e.g. free_parking=134.0)
To generate a java class from an avro file, download avrohugger-tools-{version}.jar from here and exec:
java -jar avrohugger-tools_2.11-0.15.0-assembly.jar generate-specific datafile ./src/main/scala/it/teamDigitale/avro/DataPoint.avsc ./src/main/scala/
Run the following command:
sbt clean compile package universal:package-bin
or the script compile_code.sh:
cd ./scripts
./compile_code.sh
Here we have implemented some example of a kafka producer which extract data from an rss feed.
The following command generate a kafka producer which extracts data from http://opendata.5t.torino.it/get_fdt:
java -Dconfig.file=./application.conf -cp "iotingestion_2.11-1.0.jar:./iotingestion-1.0/lib/*" it.teamDigitale.kafkaProducers.main.TorinoTrafficProducer
Here we have implemented some kafka consumer which stores data on HDFS and InfluxDB.
For running a kafka consumer (e.g. HDFSConsumerMain):
java -Dconfig.file=./application.conf -cp "iotingestion_2.11-1.0.jar:./iotingestion-1.0/lib/*" it.teamDigitale.consumers.sparkStreaming.main.HDFSConsumerMain
In the following, the vehicular flow of Firenze is analyzed for each available sensor. Data are extracted from Firenze open data website
In the following, the average speed of Torino is analyzed for each available sensor and grouped by streets. Data are extracted from Torino web service