Twitter Streaming with Apache Spark, data mining by simple Flask restful api and JupyterLab Virtualization
Special thank to Professor: Mrudula Mukadam
Big Data Technology - CS 523
M.U.M - Mar 2018
- Cloudera 5.14 (recommend 8GB RAM and 4 cores)
- JDK 1.8
- Python 3.6
- Spark 2.3.0 (virtualenv)
-
Make sure that yum is up to date by running this command:
sudo yum -y update
-
Install Kafka follow by this link from Cloudera
sudo yum -y clean all sudo yum -y install kafka sudo yum -y install kafka-server
-
Install Python 3.6
- Install yum-utils
sudo yum -y install yum-utils
- CentOS Development Tools
sudo yum -y groupinstall development
- Install IUS:
sudo yum -y install https://centos6.iuscommunity.org/ius-release.rpm
- Install Python 3.6
sudo yum -y install python36u
- Check install by running this command:
python3.6 -V
with outputPython 3.6.1
- Install Python development that require by install
happybase
:sudo yum install python36u-devel.x86_64
- Install PIP:
sudo yum -y install python36u-pip
- Install yum-utils
-
Create virtualenv with name "twitter" in /home/cloudera directory
mkdir environments cd environments python3.6 -m venv twitter
-
Install PySpark and other libraries to virtualenv from HOME folder
- Move to virtualenv "twitter":
source /home/cloudera/environments/twitter/bin/activate
- Upgrade setup tools:
pip install --upgrade setuptools
- Install PySpark:
pip install pyspark
- Install Jupyterlab:
pip install jupyterlab
- Install other Python lib:
pip install kafka pip install tweepy pip install happybase pip install -U flask pip install -U flask-cors pip install pandas python -mpip install matplotlib
- Move to virtualenv "twitter":
-
Install Microsoft's Core Fonts for JupyterLab Virtualization follow by this link:
sudo yum install curl cabextract xorg-x11-font-utils fontconfig sudo rpm -i https://downloads.sourceforge.net/project/mscorefonts2/rpms/msttcore-fonts-installer-2.6-1.noarch.rpm
-
Getting Twitter API keys
- Create a twitter account if you do not already have one.
- Go to https://apps.twitter.com/ and log in with your twitter credentials.
- Click "Create New App"
- Fill out the form, agree to the terms, and click "Create your Twitter application"
- In the next page, click on "API keys" tab, and copy your "API key" and "API secret".
- Scroll down and click "Create my access token", and copy your "Access token" and "Access token secret".
-
Open Terminal and start Kafka server:
sudo service kafka-server start
-
Create Kafka topic:
kafka-topics --create --zookeeper localhost:2181 --topic twitter-stream --partitions 1 --replication-factor 1
-
Start
hbase shell
and create new table with structure:- Key: id_str
- Column family user: author, location
- Column family general: lang, created, text, hashtags
- Column family place: country, country_code, name, full_name, place_type
by running this command:
create 'tweets', 'user', 'general', 'place'
-
Start
hive
and create new table:CREATE EXTERNAL TABLE tweets(id string, user_author string, user_location string, general_lang string, general_created string, general_created_ts string, general_text string, general_hashtags string, place_country string, place_country_code string, place_name string, place_full_name string, place_place_type string) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,user:author,user:location,general:lang,general:created,general:created_ts,general:text,general:hashtags,place:country,place:country_code,place:name,place:full_name,place:place_type') TBLPROPERTIES ('hbase.table.name' = 'tweets');
-
Create Hive view for casting string timestamp to timestamp:
CREATE VIEW vw_tweets AS SELECT id, user_author, user_location, general_lang, general_created, from_unixtime(CAST(general_created_ts AS INT)) AS general_created_ts, general_text, general_hashtags, place_country, place_country_code, place_name, place_full_name, place_place_type FROM tweets;
-
Copy file twitter_stream.zip to Clouder Home directory and extract, make sure your extracted directory path is:
/home/cloudera/twitter_stream/
-
Open file twitter_stream_kafka.py and update api keys
(twitter) [cloudera@quickstart ~]$
-
Spark Submit receive streaming from Kafka and put data to Hbase: open new Terminal, active "twitter" virtualenv and run this command:
spark-submit --master local[*] --jars /home/cloudera/twitter_stream/libs/spark-streaming-kafka-0-8-assembly_2.11-2.3.0.jar /home/cloudera/twitter_stream/spark_kafka_process.py
- Debug: after start twitter streaming from step 2 bellow, you can search in Terminal with key word DEBUG:
************************************** DEBUG: put
: Put data to Hbase table but not commit************************************** DEBUG: commit
: Commit batch rows to Hbase************************************** DEBUG: exception eachRDD:
: error when process each RDD
- TrackingSpark jobs by open this url: http://quickstart.cloudera:4040/jobs/ (url maybe difference)
- Debug: after start twitter streaming from step 2 bellow, you can search in Terminal with key word DEBUG:
-
Twitter Streaming and send to Kafka: open new Terminal, active "twitter" virtualenv and run
python /home/cloudera/twitter_stream/twitter_stream_kafka.py
-
Restful API:
-
Open new Terminal, active "twitter" virtualenv and run this command:
python /home/cloudera/twitter_stream/rest_api.py
-
Test by open this url: http://quickstart.cloudera:5000/
-
-
Incase you want to test small data:
-
Open new Terminal and start Kafka producer by this command:
kafka-console-producer --broker-list localhost:9092 --topic twitter-stream
-
Copy data from file twitter_test.json and paste to Terminal
-
-
Open new Terminal and start JupyterLab:
jupyter lab --no-browser --port=8889 --ip=quickstart.cloudera
- you can access JupyterLab from url show in Terminal like: http://quickstart.cloudera:8889/?token=xxxx
-
In JupyterLab open file
result_virtualization.ipynb
and Run All Cells from menu Run to show virtualazation
- Sometime Hbase service dead and you must be restart by commands:
sudo service hbase-master restart; sudo service hbase-regionserver restart;
- Sometime when you run file
twitter_stream_kafka.py
some exception when raise, please restart by run that command - Run
twitter_stream_kafka.py
and got 401 code: correct you date time - Exception when run Hive:
java.lang.RuntimeException: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwx------
run this command to give permission:sudo chmod -R 777 /tmp/hive
- Restart Hive service:
sudo service hive-server2 restart
- Kafka commands:
- Restart service:
sudo service kafka-server restart
- Delete topic:
kafka-topics --delete --zookeeper localhost:2181 --topic twitter-stream
- List all topics:
kafka-topics --list --zookeeper localhost:2181
- Start producer for manual send data:
kafka-console-producer --broker-list localhost:9092 --topic twitter-stream
- Restart service:
- Hbase command:
- Truncate table:
truncate 'tweets'
- Truncate table:
- using Beeline instead of Hive command:
- Enter Beeline:
beeline
- Connect to Hive server:
!connect jdbc:hive2://localhost:10000 hive cloudera
- Enter Beeline:
- Cloudera https://www.cloudera.com/
- Pandas Virtualization: https://pandas.pydata.org/pandas-docs/stable/visualization.html
- Matplotlib - pyplot: https://matplotlib.org/api/pyplot_api.html
- Spark Document: https://spark.apache.org/docs/latest/
- https://www.google.com/
- https://stackoverflow.com/