单机版(master branch)
: 项目基于storm
的实时nginx日志监控,通过读取nginx的日志文件access.log来收集nginx服务器的状态,并在一定时间内,统计访问ip的国家地址、指定时间内所有访问次数、访问的状态码、访问的站点、访问者使用的系统、访问者使用的浏览器。
集群版(cluster branch)
: 项目基于Kafka
storm
的实时nginx日志监控,将nginx的日志文件access.log读取并放入Kafka
队列中,Storm
的Spout
来对接Kafka
消息队列,来收集nginx服务器的状态,并在一定时间内,统计访问ip的国家地址、指定时间内所有访问次数、访问的状态码、访问的站点、访问者使用的系统、访问者使用的浏览器。
- 数据读取
Storm
Spout读取access.log
文件,将新产生的行记录提交给SpliteBolt。 集群模式中数据读取由Kafka Producer做,Storm消费消息记录,Storm采用KafkaSpout与Kafka整合。
最新更新数据获取思路 : 借用Apache Flume获取nginx日志,设置sources为
exec / tail -F /var/log/access.log
,设置sink为kafka
-
数据存储 storm将日志行记录划分成不同的块,其中包括ip地址块、访问时间块、访问请求信息块、访问状态码快、访问请求返回大小块、访问主机信息块。将每个块提交给CounterBolt。此外,在SpliteBolt中对行记录统计得到每天访问量等信息,存入Redis。
-
数据处理 Storm CounterBolt对提交来的块做对应的统计处理,将处理结果存入Redis。
最新更新思路,通过使用Apache Flume收集日志,正式实现实时统计
Flume 简要图
其中并未采用HDFS,而是用Kafka缓存,配置文件如下
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/nginx/access.log
# Describe the sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 2
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
storm
分块结果
collector.emit(new Values("remote_addr", milli_time + "##" + remote_addr));
collector.emit(new Values("request", milli_time + "##" + request));
collector.emit(new Values("status", milli_time + "##" + status));
collector.emit(new Values("body_bytes_sent", milli_time + "##" + body_bytes_sent));
collector.emit(new Values("virtual_host", milli_time + "##" + virtual_host));
collector.emit(new Values("http_user_agent", milli_time + "##" + http_user_agent));
key | value |
---|---|
remote_addr | 客户端ip地址 |
milli_time | 访问时间 |
request | 请求的url和HTTP协议 |
status | 请求状态码 |
body_bytes_sent | 访问请求返回内容大小 |
http_user_agent | 客户端系统信息 |
virtual_host | 虚拟站点 |
storm
统计信息
- IP统计
city = AnalyzeIP.cityOfIP(value);
counter(cityOfIP_counter, city);
- status统计
/** 将状态码分为 1** 2** 3** 4** 5** **/
Integer status = Integer.parseInt(value) / 100;
String statusStr = status + "**";
counter(status_counter, statusStr);
-
客户端信息统计
- 客户端系统类型统计
String system = UserAgent.systemRegx(value); counter(system_counter, system);
- 客户端浏览器类型统计
String browser = UserAgent.browserRegx(value); counter(browser_counter, browser);
-
virtual_host统计
String regx = "([^/]*)(\\/\\/[^/]*\\/)([^ ]*)";
Pattern pattern = Pattern.compile(regx);
Matcher matcher = pattern.matcher(value);
if (matcher.find()) {
String matcherString = matcher.group(2);
String virtual_host = matcherString.substring(2, matcherString.length()-1);
counter(virtualHost_counter, virtual_host);
}
伪码中的counter
使用了Redis数据库的自增操作,每统计到一条记录,在Redis对应的Key上自增。具体实现如下
jedis.hincrBy("city_of_ip", city, 1);
jedis.hincrBy("status_code", statusStr, 1);
jedis.hincrBy("http_user_agent_system", system, 1);
jedis.hincrBy("http_user_agent_browser", browser, 1);
......
- JDK-1.8
- Redis 4+
- IntelliJ IDEA-2017.2
- Nginx
- Storm集群
- Kafka
- Apache Flume
日志分析客户端ip地址是使用GeoLite数据库查询ip所在城市,统计城市访问量。
- 下载GeoLite2 开源数据库
- 解压数据库包
- 在项目的配置文件
src/main/resources/application.properties
中配置geolite2City.path
为数据库的解压路径
项目中使用Redis数据库存取监控信息。
- 下载Redis数据库
- 编译安装
make && make test && make install
- 启动 Redis 服务端 :
nohup ./redis-server &
- 启动 Redis 客户端 :
./redis-cli
- 测试 Redis :
127.0.0.1:6379> ping
- 在项目的配置文件
src/main/resources/application.properties
中配置redis.host
和redis.port
项目目前进度只读取access.log
一个文件,在后面的进度中会读取所有的access
日志文件
- 在项目的配置文件
src/main/resources/application.properties
中配置logFile.path
的路径
Storm
中KafkaSpout
需要配置zookeeper
节点,依托zookeeper
来管理kafka
和storm
- 在项目的配置文件
src/main/resources/application.properties
中配置zookeeper.hosts
- 想法很多,时间允许的情况下,会添加更多的统计方式,诸如每天的统计信息、每月的统计信息、所有记录统计信息。
- 有待增加Web前端展示,最近很忙,暂停更新。
- 向Goaccess看齐,也极力推荐Goaccess。https://goaccess.io/,项目精小,功能相对全面,不知道是不是自己没仔细阅读它的文档,没有发现它借用数据库,因此数据很容易丢失。但这样的好处就是使项目不占用过多系统资源。
- 很感谢美团面试官,给我提供了一个集群模式日志获取的解决思路
- 更新结果 : 原来版本中所有的统计是在Java代码中进行的,在Java代码中设置一个计数器,记录下计数值,然后更新进Redis。改进的统计方式是利用Redis的自增操作,将数据保留在Redis数据库中,每次获取到记录,将记录对应Redis中Key的value自增。这样的好处是JVM中不再保留计数值,相对减少了项目的内存占用。
- 最新更新 : 使用Apache Flume收集日志,实现source exec触发日志收集,采用
tail -F /var/log/nginx/access.log
命令输出结果作为收集源