考虑这样一种WEB服务访问日志:
{
"@timestamp": "2018-01-23T11:11:39",
"clientip": "192.168.0.100",
"serverip": "10.0.0.200",
"uri": "/hello"
}
{
"@timestamp": "2018-01-23T11:11:39",
"clientip": "192.168.0.100",
"serverip": "10.0.0.200",
"uri": "/"
}
{
"@timestamp": "2018-01-23T11:11:39",
"clientip": "192.168.0.100",
"serverip": "10.0.0.201",
"uri": "/hello"
}
{
"@timestamp": "2018-01-23T11:11:39",
"clientip": "192.168.0.100",
"serverip": "10.0.0.200",
"uri": "/hello"
}
{
"@timestamp": "2018-01-23T11:11:40",
"clientip": "192.168.0.100",
"serverip": "10.0.0.200",
"uri": "/hello"
}
我需要得到这样一种聚合结果:
{@timestamp="2018-01-23T11:11:39", count=2, clientip=192.168.0.100, serverip=10.0.0.200, url=/hello}
{@timestamp="2018-01-23T11:11:40", count=1, clientip=192.168.0.100, serverip=10.0.0.201, url=/hello}
{@timestamp="2018-01-23T11:11:40", count=1, clientip=192.168.0.100, serverip=10.0.0.200, url=/}
{@timestamp="2018-01-23T11:11:40", count=1, clientip=192.168.0.100, serverip=10.0.0.200, url=/hello}
如果数据到ES之后再做聚合, 聚合的层级非常深, 很不友好, 所以希望流式的处理掉.
于是写了这样一个Filter, 配置如下:
- com.ctrip.ops.hangout.filter.LinkMetric:
fieldsLink: 'clientip->serverip->url'
timestamp: '@timestamp'
reserveWindow: 1200 #20m
batchWindow: 300 #5m
add_fields:
filtertype: metric
- fieldsLink: 层级关系, 类似于ES Aggs里面的一层层的Term Aggs, 用->分隔.
- timestamp: 日志中的时间戳字段, 如果不配置这个字段或者日志中没有这个字段, 则日志按当前处理到的时间处理. 注意: 时间戳字段需要是joda.Datetime 类型, 也就是hangout的Date filter处理过的.
- batchWindow: 多久的数据聚合在一起, 单位是秒. 300就是指5分钟内的数据做一批聚合, 而且每5分钟输出一次聚合结果.
- reserveWindow: 在内存中保留过去多久的聚合结果. 20分钟前的数据会丢弃(避免极端情况下无限使用内存). 5分钟之前, 20分钟以内的数据不会丢掉, 而是继续处理. 在下个5分钟后一起输出.
- add_fields: 可选项. 主要为聚合数据打标签, 和原始数据区分开.
- 如果时间戳比当前时间更晚, reserveWindow不会生效, 可能会导致内存使用过多.
- 如果数据不是源源不断的, 最后一批聚合数据不会输出... 因为它不能被触发.
下载https://github.com/childe/hangout-filter-linkmetric/releases/download/0.1/hangout-filters-statmetric-0.1.jar, 复制到hangout/modules下面.