/storm-nginx-log

基于Kafka、Storm的nginx日志监控,采用Apache Flume收集日志

Primary LanguageJavaMIT LicenseMIT

nginx日志监控

build status codebeat badge size platform progress

简单介绍

单机版(master branch) : 项目基于storm的实时nginx日志监控,通过读取nginx的日志文件access.log来收集nginx服务器的状态,并在一定时间内,统计访问ip的国家地址、指定时间内所有访问次数、访问的状态码、访问的站点、访问者使用的系统、访问者使用的浏览器。

集群版(cluster branch) : 项目基于Kafka storm的实时nginx日志监控,将nginx的日志文件access.log读取并放入Kafka队列中,StormSpout来对接Kafka消息队列,来收集nginx服务器的状态,并在一定时间内,统计访问ip的国家地址、指定时间内所有访问次数、访问的状态码、访问的站点、访问者使用的系统、访问者使用的浏览器。

实现思路

  1. 数据读取 StormSpout读取access.log文件,将新产生的行记录提交给SpliteBolt。 集群模式中数据读取由Kafka Producer做,Storm消费消息记录,Storm采用KafkaSpout与Kafka整合。

最新更新数据获取思路 : 借用Apache Flume获取nginx日志,设置sources为 exec / tail -F /var/log/access.log,设置sink为kafka

  1. 数据存储 storm将日志行记录划分成不同的块,其中包括ip地址块、访问时间块、访问请求信息块、访问状态码快、访问请求返回大小块、访问主机信息块。将每个块提交给CounterBolt。此外,在SpliteBolt中对行记录统计得到每天访问量等信息,存入Redis。

  2. 数据处理 Storm CounterBolt对提交来的块做对应的统计处理,将处理结果存入Redis。

集群模式逻辑结构

最新更新思路,通过使用Apache Flume收集日志,正式实现实时统计

使用Apache Flume收集日志

Flume 简要图

其中并未采用HDFS,而是用Kafka缓存,配置文件如下

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/nginx/access.log

# Describe the sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 2
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Storm处理过程

  1. storm分块结果
collector.emit(new Values("remote_addr", milli_time + "##" + remote_addr));
collector.emit(new Values("request", milli_time + "##" + request));
collector.emit(new Values("status", milli_time + "##" + status));
collector.emit(new Values("body_bytes_sent", milli_time + "##" + body_bytes_sent));
collector.emit(new Values("virtual_host", milli_time + "##" + virtual_host));
collector.emit(new Values("http_user_agent", milli_time + "##" + http_user_agent));
key value
remote_addr 客户端ip地址
milli_time 访问时间
request 请求的url和HTTP协议
status 请求状态码
body_bytes_sent 访问请求返回内容大小
http_user_agent 客户端系统信息
virtual_host 虚拟站点
  1. storm统计信息
  • IP统计
city = AnalyzeIP.cityOfIP(value);
counter(cityOfIP_counter, city);
  • status统计
/**    将状态码分为 1** 2** 3** 4** 5**     **/
Integer status = Integer.parseInt(value) / 100;
String statusStr = status + "**";
counter(status_counter, statusStr);
  • 客户端信息统计

    • 客户端系统类型统计
     String system = UserAgent.systemRegx(value);
     counter(system_counter, system);
    • 客户端浏览器类型统计
    String browser = UserAgent.browserRegx(value);
    counter(browser_counter, browser);
  • virtual_host统计

String regx = "([^/]*)(\\/\\/[^/]*\\/)([^ ]*)";
Pattern pattern = Pattern.compile(regx);
Matcher matcher = pattern.matcher(value);
if (matcher.find()) {
    String matcherString = matcher.group(2);
    String virtual_host = matcherString.substring(2, matcherString.length()-1);
    counter(virtualHost_counter, virtual_host);
}

伪码中的counter使用了Redis数据库的自增操作,每统计到一条记录,在Redis对应的Key上自增。具体实现如下

jedis.hincrBy("city_of_ip", city, 1);
jedis.hincrBy("status_code", statusStr, 1);
jedis.hincrBy("http_user_agent_system", system, 1);
jedis.hincrBy("http_user_agent_browser", browser, 1);
......

统计结果截图

Keys

客户端系统统计结果

每天点击量统计

状态码统计

每天请求响应文件大小

Virtual_host统计

每天访问人数统计

客户端地址统计

客户端浏览器统计

部署测试

安装使用

基础环境

  • JDK-1.8
  • Redis 4+
  • IntelliJ IDEA-2017.2
  • Nginx
  • Storm集群
  • Kafka
  • Apache Flume

下载GeoLite2IP数据库

日志分析客户端ip地址是使用GeoLite数据库查询ip所在城市,统计城市访问量。

  • 下载GeoLite2 开源数据库
  • 解压数据库包
  • 在项目的配置文件src/main/resources/application.properties中配置geolite2City.path为数据库的解压路径

安装Redis数据库

项目中使用Redis数据库存取监控信息。

  • 下载Redis数据库
  • 编译安装 make && make test && make install
  • 启动 Redis 服务端 : nohup ./redis-server &
  • 启动 Redis 客户端 : ./redis-cli
  • 测试 Redis : 127.0.0.1:6379> ping
  • 在项目的配置文件src/main/resources/application.properties中配置redis.hostredis.port

配置access.log地址

项目目前进度只读取access.log一个文件,在后面的进度中会读取所有的access日志文件

  • 在项目的配置文件src/main/resources/application.properties中配置logFile.path的路径

配置zookeeper节点

StormKafkaSpout需要配置zookeeper节点,依托zookeeper来管理kafkastorm

  • 在项目的配置文件src/main/resources/application.properties中配置zookeeper.hosts

更新说明

  • 想法很多,时间允许的情况下,会添加更多的统计方式,诸如每天的统计信息、每月的统计信息、所有记录统计信息。
  • 有待增加Web前端展示,最近很忙,暂停更新。
  • 向Goaccess看齐,也极力推荐Goaccess。https://goaccess.io/,项目精小,功能相对全面,不知道是不是自己没仔细阅读它的文档,没有发现它借用数据库,因此数据很容易丢失。但这样的好处就是使项目不占用过多系统资源。
  • 很感谢美团面试官,给我提供了一个集群模式日志获取的解决思路
  • 更新结果 : 原来版本中所有的统计是在Java代码中进行的,在Java代码中设置一个计数器,记录下计数值,然后更新进Redis。改进的统计方式是利用Redis的自增操作,将数据保留在Redis数据库中,每次获取到记录,将记录对应Redis中Key的value自增。这样的好处是JVM中不再保留计数值,相对减少了项目的内存占用。
  • 最新更新 : 使用Apache Flume收集日志,实现source exec触发日志收集,采用tail -F /var/log/nginx/access.log命令输出结果作为收集源