The main purpose of this project is creating streaming flow with analyzing randomly generated IPs matched different behaviors of real IPs. This study includes analysis, forecasting and anomaly detection on time series data.
-
Initialization: Project has also InitialDag does not take place in workflow schema. This DAG is triggered only once from Airflow, so it creates initial conditions:
- Kafka: Topics are created according to config.json file.
- Redis Time Series Database (RTSDB): Fields are created using IPs and training column types (unq_dst_ip, frequency etc.) as keys in RTSDB.
-
Data Generation (SenderDAG): Data is generated as anonymized data. There are generated source ips that have features merged from real data. Each data takes place in a single JSON file. These files are sent to relevant tools via Airflow DAG. After sending, next data file name is written to config.json file.
Example file:
{
"src_ip": "187.44.158.33",
"unq_dst_ip": "3"
_other features_
"timestamp": "2022-01-03 09:00:00"
}
-
Data Distribution and Processing (ProcessorDAG): There are two third-party tools for data distribution: Kafka and Redis.
- Kafka: Each data has different partitions according to their IP in one Kafka topic. There is ip-partition map was done before running application.
Kafka data is used by Spark Streaming. Spark master and worker containers always listen the LogTopic and collect data with complete mode so
data is never lost. Then, Spark uses another Kafka topic called SparkStreamingTopic to write average of unq_dst_ip feature
for collected data of each ip, and also they are in different partitions in the topic.
Example: Kafka partitions after the first two data of each IP is sent:
- Redis Time Series Database (RTSDB): Each IP based data is sent to related key of database, they are separated. The data is used for time series forecasting
with Facebook Prophet model. Thanks to Prophet's work with Spark parallelization and grouping, each data has its own forecasting model.
After training operation finished, RTSDB is cleaned to prevent overloading of cache mechanism. Training is scheduled with Airflow
and historical data (which is cleaned from RTSDB) is dumped to PostgreSQL Database for consistency of data.
Example: RTSDB before periodic cleaning of the data:
- Kafka: Each data has different partitions according to their IP in one Kafka topic. There is ip-partition map was done before running application.
Kafka data is used by Spark Streaming. Spark master and worker containers always listen the LogTopic and collect data with complete mode so
data is never lost. Then, Spark uses another Kafka topic called SparkStreamingTopic to write average of unq_dst_ip feature
for collected data of each ip, and also they are in different partitions in the topic.
Spark ML pipeline is also added to Processor DAG as a task to use GBT Regression model in forecasting desired IP feature. Generated ML model is saved into src/model directory with pipeline stages. (There is no exact data flow, so it can be assumed that logs.csv file always be updated via data flow)
- Data Visualization: Currently, Apache Superset service that has PostgreSQL package installed via Dockerfile has been added to Docker environment.
It will be improved adding forecasting visualization with comparing real data and forecasted data.
Example: Visualization of Real Data from PostgreSQL (dumped RTSDB):
Note: It seems graphic has some gaps between data. This is because Airflow duplicates or deletes some messages out of its scheduling mechanism (problem).
Bitnami's Docker distributions are used for Kafka and Spark containers. In previous attempts with other publishers like official and Confluent, Spark Streaming-Kafka connection did not work.
example_files folder can be used to understand data format and if desired, the number of data can be increased, because it must be sufficient to run Prophet model properly. Clone operation automatically creates empty cl_logs folder. Please delete it and change the folder name example_files to cl_logs. Also, to get meaningful results in forecasting, remove dags/forecasts, dags/models and dags/history folders.
In .env file, Airflow, Spark, Kafka, Kafka Visualizer and Postgres containers have some important configurations. SELECTED_DATA contained list of original data part before the anonymized data was constructed. So, it does not need to be visible.
Source ip-partition number mapping is created using utils/map_source_ip_to_partition_number function and if desired, data could be increased with creation of JSON files consists new IP data. Please use this function over the data/logs.csv file (with new data) and add same data entries into cl_logs folder as a JSON file to increase number of unique source ips in this flow.
- Airflow → 8080
- Kafka → 9092
- Spark
- Master → 7077
- Internal → 8080
- External → 9090
- Kafdrop (Kafka Visualizer) → 9000
- Redis → 6379
- Redis Insight → 8001
- Superset → 8088
- Zookeeper → 2181
- Postgres
- Internal → 5432
- External → 5434
For the system to work:
docker-compose up -d --build
Build operation is required in first run.