해킹당한 웹 서버의 로그기록을 웹 로그 마이닝을 통해 유해 봇들을 추출하고 관심 있는 사용자들의 유용한 행동 패턴을 분석하여 웹 서비스에 활용가능한 정보를 얻기 위한 프로젝트입니다.
사용된 데이터는 2019년 1월 해킹을 당했던 웹 서버의 로그기록을 통해 분석했습니다.
게시판 형태의 웹 서비스를 타깃으로 하여 교회 사용자를 Hack, Search 에이전트로 분류하고, 컨텐츠 사용자들의 컨텐츠 사용 패턴을 추출합니다.
웹 로그 분석을 위해 Hadoop, Spark 사용하며, 데이터 전처리 및 저장을 위해 Filebeat, Logstash, Elasticsearch, Kibana를 사용합니다. 분석된 데이터를 시각화하기 위해 FLASK, Apache HTTP Server 를 사용합니다.
- Windows 10
- Ubuntu 16.04 LTS
- Ubuntu 18.04 LTS
- Scala
-
- Java Install
- ssh Setting
-
- Logstash
- Elasticsearch
- Kibana
- Filebeat
- Hadoop
- Spark
- ELK Stack
- ubuntu 16.04 이상
Apache 웹 서버로부터 원시 웹 액세스 로그를 데이터 전처리 서버로 전송합니다.
전송된 로그 데이터는 Filebeat를 통해 Logstash로 전달됩니다.
전달된 원시 로그 데이터는 Logstash를 통해 필터링 되어 지리데이터를 포함하고, 데이터 포맷이 Json형식으로 바뀌어 Elasticsearch에 색인됩니다.
색인된 데이터를 통해 데이터 분석을 시작하며, Apache Hadoop와 Elasticsearch의 연동 인터페이스 ES-HADOOP을 사용하여 Elasticsearch와 Hadoop 간에 데이터를 더욱 쉽게 이동할 수 있게 할 수 있습니다.
Apache Spark 분석 엔진을 통해 로그 데이터를 분석하여 분석된 데이터를 시각화 웹 서버로 전송한 뒤 마이크로 웹 프레임워크 Flask와 Apache HTTP Server를 통해 분석된 데이터를 차트로 시각해줍니다.
Kibana를 통해 시각화하는 방법도 있습니다.
원시 접속 로그를 데이터 전처리 서버로 전송해야합니다.
아래 스크립트를 통해 전처리 서버로 전송이 가능합니다.
declare -a array
array=$(echo [SaveFolderPath]/*)
P=$(date "+%Y-%m-%d")
ssh [user]@[ip] 'mkdir [SaveFolderPath]$(date "+%Y-%m-%d")'
ssh [user]@[ip] 'rm [SaveFolderPath]/Week/*'
for A in ${array}; do
if [[ "$A" =~ "access.log" ]]; then
if [[ "$A" =~ "gz" || "$A" =~ "other" ]]; then
continue
else
echo ${A:20}
scp -rP [port] $A [user]@[ip]:[SaveFolderPath]/$(date "+%Y-%m-%d")/${A:20}
scp -rP [port] $A [user]@[ip]:[SaveFolderPath]/${A:20}
fi
fi
done
#txt file 을 만들어 두셔야합니다. 용도는 전송한 날짜 저장을 위함입니다.
ssh -p [port] [user]@[ip] 'echo $(date "+%Y-%m-%d") >> [Folderpath]/[txt file name]'
전송받은 데이터 전처리 서버는 Week 폴더에 분석을 시작할 주의 웹 로그 데이터가 들어있고, 분석전 해당 주의 폴더에 Log 데이터를 백업을합니다.
해당 스크립트는 일주일 단위로 자동으로 전송해야하므로 crontab 스케쥴링에 포함시켜줍니다.
export VISUAL=vim; crontab -e
crontab 스케쥴링시 자신이 원하는 시간, 날짜를 정해서 수정하시면됩니다.
0 12 * * 1 sh $HOME/Log-transport.sh
전처리 서버의 핵심은 원시 로그 데이터에 들어있는 필드값을 grok 필터를 통해 필트값을 가져오고, geoip 를 통해서 지리 데이터를 추가하는것입니다.
위 그림과 같이 지리 정보 데이터를 추가하여 지도를 통해 정보를 보여줄 수 있습니다.
전처리는 전단계인 원시 로그데이터가 전송한 후에 작업이 이루어져야합니다.
스케쥴링시에 약간의 시간을 두시고 스케쥴링해주세요.
분석을 시작하기 전에 웹 서버의 게시판을 크롤링해야합니다.
사용자의 접속과 게시판 연관을 위해 필요합니다.
크롤링한 데이터를 HDFS상에 저장합니다.
python3 crw.py
hadoop fs -put urldata.json /urldata/urldata.json
sbt-project-folder/
└─ src
└─ main
└─ scala
└─ scala-source
name := "AutoLog Project"
version := "1.3"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.3"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.3"
sbt package
$SPARK_HOME/bin/spark-submit --class Auto --master yarn $HOME/AutoLab/Auto/target/scala-2.11/autolog-project_2.11-1.3.jar
curl -X DELETE "[es-ip]:[es-port]/[es-index-name]?pretty"
hadoop fs -copyToLocal /Auto/$P/*.json /home/hadoop/AutoLab/Week_DF/$P/Total.json
hadoop fs -copyToLocal /Auto/Class/*.json /home/hadoop/AutoLab/Week_DF/$P/Class.json
hadoop fs -copyToLocal /Auto/Content/*.json /home/hadoop/AutoLab/Week_DF/$P/Content.json
hadoop fs -copyToLocal /Auto/Hack/*.json /home/hadoop/AutoLab/Week_DF/$P/Hack.json
hadoop fs -copyToLocal /Auto/User/*.json /home/hadoop/AutoLab/Week_DF/$P/User.json
hadoop fs -copyToLocal /Auto/Korea/*.json /home/hadoop/AutoLab/Week_DF/$P/Korea.json
hadoop fs -copyToLocal /Auto/Classfication/*.json /home/hadoop/AutoLab/Week_DF/$P/Classfication.json
hadoop fs -copyToLocal /Auto/Country/*.json /home/hadoop/AutoLab/Week_DF/$P/Country.json
hadoop fs -rm /Auto/$P/*
hadoop fs -rm /Auto/Class/*
hadoop fs -rm /Auto/Content/*
hadoop fs -rm /Auto/Hack/*
hadoop fs -rm /Auto/User/*
hadoop fs -rm /Auto/Korea/*
hadoop fs -rm /Auto/Classfication/*
hadoop fs -rm /Auto/Country/*
hadoop fs -rmdir /Auto/$P
hadoop fs -rmdir /Auto/Class
hadoop fs -rmdir /Auto/Content
hadoop fs -rmdir /Auto/Hack
hadoop fs -rmdir /Auto/User
hadoop fs -rmdir /Auto/Korea
hadoop fs -rmdir /Auto/Classfication
hadoop fs -rmdir /Auto/Country
P=$(date "+%Y-%m-%d")
ssh -p [web-server-port] [web-server-user]@[web-server-ip] "echo '[sudo pwd]' | sudo -S mkdir /var/www/html/FLASKAPPS/Data/$P"
ssh -p [web-server-port][web-server-user]@[web-server-ip] "sh /home/[user]/DataLoad.sh
P=$(date "+%Y-%m-%d")
scp -P [Hadoop-master-port] hadoop@[hadoop-master-ip]:/home/hadoop/AutoLab/Week_DF/$P/* $HOME/Data/
echo '[sudo pwd]' | sudo -S mv $HOME/Data/* /var/www/html/FLASKAPPS/Data/$P/