Distributed-System-Spark

本專案以Spark-MLlib API實現爬蟲資料缺失自動補救機制,先透過FinMind API爬取每日股票,將資料存入Influx Database,當爬蟲部分出現錯誤而無法依序取得資料時,系統將自動呼叫MLlib API存取Database資料並進行預測,再將預測結果寫回Database,最後以Plotly API 和 Dash API進行資料視覺化比較有補救與未補救的差異,以下為我們整體專案的介紹和相關API的安裝流程。

Introduction

Stock Data Crawler

每20秒call一次 FinMind API,並存到 influxDB,形成時間序列資料。當無法在時間內爬取到新資料(斷網、API故障),會視為缺失並且以資料缺失的日期為參數去 call spark 來預測這筆缺失的資料。本專案使用 2330(台積電) 為例,並以 API 回傳的 close 值為預測目標。

InfluxDB

在資料的儲存上,我們選擇使用時間序列資料庫 - InfluxDB。並將其建立在 AWS 的 EC2(Win) 上,使用公開 IP 的方式和三方程式互動。 在資料庫這部分我們設計兩個 table web_crawler_dataprediction_data,兩個 table 都會儲存爬蟲的資料,差別是 prediction_data 會額外儲存 spark 預測的缺失值。

Spark Cluster & Regression Prediction

Spark Cluster

在 spark 的叢集上,我們選擇使用 vagrant 的方式在一台電腦建立多個 VM 作為節點。其中包含一個主節點(master)、兩個工作節點(worker),每個節點分配 2g 記憶體和3顆 cpu。 在首次 vagrant up 時 master 節點用 ssh-keygen 產生公鑰,並且複製到其他 worker 節點的 /root/.ssh 底下,並在 /etc/host 底下加入各節點的 IP 資訊,方便後續叢集的連接。此外,還會自動安裝 packages 底下預先下載好的 spark 和 java 的壓縮檔,並且將 spark 和 java 環境變數加入到使用者設定檔 .bashrc 底下,讓我們可以在 user 的資料夾裡直接執行 spark 環境。

Regression prediction

  • 程式碼為spark_predict.py,程式是以pyspark為基礎撰寫,由五個自訂函數組成,函數分別為:
    • get_args(): 用來取得預設參數,包括資料缺失的日期、時間及要預測的筆數、訓練模型所需的特徵數,而特徵則為近二筆的價格。

    • get_data(): 從influxdb撈取目前時點最近20筆(data_num*2)的資料,在20筆資料中篩選出早於缺失資料的時間的近10筆資料。由於我們要預測的是股價,屬於時間序列資料,因此透過一連串資料處理,產生sliding window的資料集,作為訓練預測模型的資料。

    • predict_price(): 輸入get_data產生資料集,製成dataframe,以T(當前close)作為label,近二筆的價格作為features,透過pipeline進行資料處理及訓練,訓練模型為Linear Regression,經過訓練後預測缺失資料時間的價格,將預測結果儲存成json格式,並判斷預測結果是否有正確產生,若無則輸出False狀態。

    • write_data(): 輸入predict_price()產生的預測結果與狀態,若狀態為True,則將預測結果寫回db的prediction_data表,否則,只印出失敗訊息不做任何動作。

    • main():依序執行將前述的四大功能,並印出各階段所花時間。

Visual Design

透過Plotly套件將資料庫讀取的DataFrame資料視覺化,並利用Dash套件將視覺化圖表呈現於網站,由於資料為時間序列資料,會不停地更新,因此設定每五秒從資料庫更新視覺化資料,即時呈現資料變動。 其中上下分別為 原始爬蟲資料(web_crawler_data)經過 spark 填補的資料 (prediction_data),透過這種方式可以很值觀的比較兩者差別。

Dev Environment

Spark cluster(VM - Ubuntu)

  1. 先安裝好 Vagrantgitgit-lfs

  2. 執行下列命令:

git clone https://github.com/C-WeiYu/Distributed-System-Spark.git
cd Distributed-System-Spark
  1. 透過 Vagrantfile 建立 spark cluster VM。(可以更改 Vagrantfile 的 num_nodes 設定總共要建立幾個工作節點)
vagrant up
  1. 進入到 master 節點的 VM,啟動 master 節點。 (預設 spark-node1 為 master)
vagrant ssh spark-node1
sudo $SPARK_HOME/sbin/start-master.sh
  1. 依序進入到 worker 節點的 VM,下面以 spark-node2 為例。 (預設 spark-node2、spark-node... 為 worker)
vagrant ssh spark-node2
sudo $SPARK_HOME/sbin/start-worker.sh spark://$MASTERIP:7077
  1. 可以到 local 的電腦,進到 http://10.0.1.101:8080/ 去監控 VM 中的 Spark cluster。(10.0.1.101 為 master-node IP)

  1. 在 master 安裝所需的 python 套件。
cd Distributed-System-Spark
sudo pip3 install -r requirements.txt

InfluxDB(AWS)(Win10)

  1. 打開Windows PowerShell,貼上以下連結,安裝influxdb
wget https://dl.influxdata.com/influxdb/releases/influxdb-1.8.6_windows_amd64.zip -UseBasicParsing -OutFile influxdb-1.8.6_windows_amd64.zip
Expand-Archive .\influxdb-1.8.6_windows_amd64.zip -DestinationPath 'C:\Program Files\InfluxData\influxdb\'
  1. 修改C:\Program Files\InfluxData\influxdb\influxdb-1.8.6-1(預設路徑)底下的influxdb.conf
(45行、48行) 可修改預設檔案夾路徑 
(251行) enabled = true
(260行) bind-address = ":8086"
  1. 安裝NSSM,並執行
nssm.exe install
Path的路徑為influxd.exe的檔案位置
Arguments的路徑為influxdb.conf的檔案位置
  1. 在nssm/win64的目錄下,執行
sc start (Service name)
  1. 在C:\Program Files\InfluxData\influxdb\influxdb-1.8.6-1(預設路徑)執行以下指令,啟動InfluxDB Server
influxd.exe --config influxdb.conf
  1. 在aws控制台的安全設定中,對外開放8086 port

  2. 在Windows防火牆的進階設定中,對外開放8086 port

Local computer(Win10)

  1. 建立乾淨的 python 環境。(conda 為例也可以使用 virtualenv)
conda create --name env python=3.6
conda activate env
  1. 安裝需要的套件。
pip install -r requirements.txt

Version info

軟體 版本 網址 備註
JAVA JDK8(linux、x86 64bit) OpenLogic’s OpenJDK Downloads 可使用: Java 8/11, Scala 2.12/2.13
Spark 3.2.1(hadoop3.2) Downloads | Apache Spark
Python 3.6.9
Vagrant 2.2.19 Vagrant by HashiCorp (vagrantup.com) 需先自行安裝
Ubuntu Ubuntu 18.04.6 LTS (Bionic Beaver) Discover Vagrant Boxes - Vagrant Cloud (vagrantup.com) 透過 vagrant 安裝
InfluxDB 1.8.6 InfluxDB: Open Source Time Series Database

Quick start

  1. 在開始前,請確定設定好環境(看上一章 Dev Environment)
  2. 以下命令皆在 Distributed-System-Spark 資料夾底下運行

Step1: 在 local 開啟視覺化 dashbord

python scripts/stock_dash.py

Step2: DEMO 版面配置

  • Local: 跑視覺化 dashboard,透過瀏覽器開啟。
  • Spark UI: Spark 內建,監控 spark cluster 的狀態,master node 執行後就可以在 local 的瀏覽器透過 http://10.0.1.101:8080/
  • Master: 主要節點,負責執行爬蟲程式和 pyspark 預測資料
  • Worker: 工作節點,負責執行 master 派下來的工作,這邊使用 htop 開啟效能監控,可以呈現資源使用的狀態。

Step3: master 的 VM 執行爬蟲程式

python3 scripts/crawler_version1.py
  • 註: 因目標是爬取股市資料,9:00 -13:30 才會有資料

Step4: 結束後記得關 VM

vagrant halt

Demo

Demo Video

reference

Contributors

組員 系級 學號 工作分配 github
莊崴宇 資科碩一 110753117 簡報、github C-WeiYu
何彥南 資科碩一 110753202 Spark 叢集、github、Demo 影片 aaron1aaron2
姚惠馨 資科碩一 110753135 Dashboard Hsin0705
吳仁凱 資科碩一 110753157 Pyspark(數據處理與分析)、github程式碼說明 k0341055
張修誠 資科碩一 110753165 爬蟲 juzowa
吳泓澈 資科碩一 107306009 InfluxDB Hunter107306009