- Direct to the project folder
docker compose up -d
# running airflow and required components as single service- Open Airflow UI (http://localhost:8080/home) after a short period of time (~1 minute), user name and password are both
airflow
. Dag id =ML_data_pipeline_demo
should be deployed, track the dag running as needed docker compose down
# to stop and remove containers/volumes/networks images
Problem 0-4 is integrated as an automated pipeline in a DAG named ML_data_pipeline_demo
. I defined them as four sequential tasks in my_dag.py.
For the testing purpose, the DAG can be triggered manually, I didn’t set up any subsequent runs for the DAG yet. The schedule can be easily set up as needed in the my_dag.py file.
with DAG(
dag_id='ML_data_pipeline_demo',
start_date=datetime.now(),
schedule_interval=None
) as dag:
- Task_id =
download_raw_data
- Python module =
tasks/download_raw_data.py
To fully automate the data pipeline, I used Kaggle API to download the ETF and stocks dataset. For the convenience of testing, I post my personal token here which is usually not the good practice. Please free feel to replace with your own API token.
- Task_id =
process_raw_data
- Python module =
tasks/ process_raw_data.py
There are 8048 data files to process, python multiprocessing package does not work on airflow, so I used billiard instead of multiprocessing to implement the multiprocessing (reference 1).
I am testing on a computer with 4 cores in total, 2 cores are used in the multiprocessing. So the ceiling of performance improvement is to reduce the running time by 50%. If the DAG can be tested on a computing resouce with more cores, the it would be much beneficiary from multiprocessing.
- Task_id =
feature_engineering
- Python module:
tasks/ feature_engineering.py
Similarly, I implemented multiprocessing by billiard in this task. Two cores can be running parallelly in my testing. Besides, the unit test of the feature engineering function is saved in the other/unnittest
folder. It includes six test cases in total:
- Three tests for the
vol_moving_avg
calculation. I chose a random stock, chose the first, last and NA values from the result, and then compared with the expected values (manually calculated); - Three tests for the
adj_close_rolling_med
calculation. I chose a random stock, chose the first, last and NA values from the result, and then compared with the expected values (manually calculated).
To run the unit test, direct to the unittest folder and run the python script unittest_feature_engineering_calc.py
. The result shows:
- Task_id =
model_training
- Python module =
tasks/ ML_model_training_polyreg.py
Linear regression model is too restricted in this case. We only have two features, in order to capture the non-linear relationship between the features and the target, I chose to use polynomial regression model with the order of 2, The RMSE (Root-Mean-Square-Error) is much reduced compared to the linear regression.
The resulting model is saved as /data/trained_model/ml_model.pkl
.
The training metrics (RMSE) is appended to the log file /data/trained_model/model_training_log.pkl
. Any other metrics, loss or error values can be easily added.
I used FastAPI to create the API. To test with the API, direct to the /other/API
folder in terminal, then:
pip install -r requirements.txt
uvicorn app:app –reload
# start the server
The following screenshot shows the server is running:
The default route is another place to show the server is running (http://127.0.0.1:8000/). Finally, we can test the model API in the URL http://127.0.0.1:8000/docs. The following screenshot is a dummy example.
- Input in the text_message is
vol_moving_avg=10&vol_moving_avg=20000
- The output in the returned json file is 860176.19
As mentioned above, there are only two cores running on my laptop. The total amount of time to run the DAG is about 30 minutes. The running performance will be much reduced with more cores in data processing and feature engineering.
In details, the running time of each task is as below:
- Download raw data: 6 Min 16 Sec
- Process raw data: 14 Min 30 Sec (multiprocessing with 2 cores)
- Feature engineering: 7 Min 9 Sec (multiprocessing with 2 cores)
- Model training: 1 Min 57 Sec (polynomial regression is fast)
The detailed logs can be found in the /logs/dag_id=ML_data_pipeline_demo/
folder.
- Increase the computing cores, so that the pipeline can fully benefit from parallel processing.
- Improve the model prediction. We can work on at least two aspects to improve the model prediction:
- Model selection.
- Explore more features and more ways of feature construction.
- Further streamline the process. The entire process from end to end should roughly include 6 steps:
We’ve integrated through step 1 to 3, and part of step 4. The next improvement could be building the entire seamless working stream.