This project contains two Flink streaming jobs implemented in Python:
- Data Cleaning and Publishing: Reads data from a WebSocket, performs cleanup, and publishes it to a Kafka topic.
- Data Aggregation and Enrichment: Reads data from a Kafka topic, aggregates it into a 20-second tumbling window, enriches the result with data from Worldometers, and sends it to MongoDB.
+------------------------+
| Twitter Data Simulator |
| (generator) |
+------------------------+
|
|
|
v
+--------------------+
| Collection Job |
| (collector_stream) |
+--------------------+
|
|
|
v
+--------------+
| Kafka Topic |
| (tweets) |
+--------------+
|
|
|
v
+---------------------+ +--------------+
| Aggregation Job | -------> | worldometers |
| (aggregator_stream) | | |
+---------------------+ +--------------+
|
|
|
v
+----------------------+
| MongoDB |
| (twitter_collection) |
+----------------------+
A script (twitter_stream_simulator.py
) used to generate dummy data and publish it to a WebSocket on localhost:5555
.
Reads data from the WebSocket, performs data cleanup, and publishes the cleaned data to the Kafka topic.
A Kafka topic where cleaned data is published by the data cleaning job and from which data is consumed by the aggregation job.
Reads data from the Kafka topic, aggregates it into a 20-second tumbling window, enriches the data with information from Worldometers, and sends it to MongoDB.
Resource that represent Worldometers
Saves enriched data into MongoDB.
- Build the Docker Images:
make docker_build
- Start the Environment::
make docker_up
- Package and deploy project::
make clean package deploy
- In a separate terminal window create Flink SQL Table to view result::
make sql_client
CREATE TABLE IF NOT EXISTS twitter_table ( `hash` STRING, `content` ARRAY<STRING>, `timestamp` STRING, `total_cases_count` BIGINT, `total_deaths_count` BIGINT, `total_recovered_count` BIGINT, `new_cases_count` BIGINT, `new_deaths_count` BIGINT, `new_recovered_count` BIGINT, `active_cases_count` BIGINT, `critical_cases_count` BIGINT, PRIMARY KEY (hash) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'uri' = 'mongodb://mongodb:27017', 'database' = 'twitter_db', 'collection' = 'twitter_collection' );
- Run the Collector Stream::
make remote_collection_stream
- Run the Aggregation Stream::
make remote_aggregation_stream
- To view running Fling Job's open in the browser: http://localhost:8082/#/job/running
- To results go back to terminal window with Flink SQL and run query::
To refresh results, rerun the same query and pay attention
SELECT * FROM twitter_table;
timestamp
column. - Shutdowm env::
make docker_down
- Clean up::
make docker_clean