1.1 抽取出业务时间戳,告诉 Flink 框架基于业务时间做窗口
1.2 过滤出点击行为数据
1.3 按一小时的窗口大小,每 5 分钟统计一次,做滑动窗口聚合(Sliding Window)
1.4 按每个窗口聚合,输出每个窗口中点击量前 N 名的商品
测试热门TopN商品:
kafka数据源:滑动窗口->5分钟一个窗口(若后面没有延迟,则会每5分钟输出一次往前一个小时内的统计结果)
启动kafka:
cd /usr/local/kafka
./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties
终端启动kafka生产者:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hotitems
Behavior.csv:
543462,1715,1464116,pv,1511658000 (窗口开始时间戳:1511658000 ->2017/11/26 9:0:0 ,第一个窗口结束时间:2017/11/26 9:05:0 第一个结果输出应该是5分钟后,但定时器延迟了1s才输出,则到5mins+1s后才会有结果输出)
662867,2244074,1575622,pv,1511658060
561558,3611281,965809,pv,1511658120
894923,1715,1879194,pv,1511658180
834377,2244074,3738615,pv,1511658240
625915,3611281,570735,pv,1511658300
578814,1715,982926,pv,1511658330
87335,1256540,1451783,pv,1511658540
429984,2244074,2355072,pv,1511658600
937166,1715,2355072,pv,1511661600
BUG1:
Caused by: org.apache.kafka.common.errors.InvalidTopicException: Topic 'hotitems' is invalid
解决:将kafka、flink集群、应用应用程序重新启动。
测试过程:
543462,1715,1464116,pv,1511658000 ->2017/11/26 9:00:00
662867,2244074,1575622,pv,1511658060 ->2017/11/26 9:01:00
561558,3611281,965809,pv,1511658120 ->2017/11/26 9:02:00
625915,3611281,570735,pv,1511658300 ->2017/11/26 9:05:00
578814,1715,982926,pv,1511658301 ->2017/11/26 9:05:01
578814,1715,982926,pv,1511658330 ->2017/11/26 9:05:30
87335,1256540,1451783,pv,1511658540 ->2017/11/26 9:09:00
429984,2244074,2355072,pv,1511658600 ->2017/11/26 9:10:00
937166,1715,2355072,pv,1511661600 ->2017/11/26 10:00:00
937170,1715,2355072,pv,1511665200 ->2017/11/26 11:00:00
输出结果:
其中时间为窗口结束时间 时间窗口
第一条数据EventTime的时间戳 1511658000 ->2017/11/26 9:00:00 ,若第一条数据的EventTime的时间戳 1511658060 ->2017/11/26 9:01:00,下面时间窗口分段不变
时间: 2017-11-26 09:05:00.0 [08:05:00.0,09:05:00.0)
No1: 商品ID=1715浏览量=1
No2: 商品ID=3611281浏览量=1
No3: 商品ID=2244074浏览量=1
=============================
时间: 2017-11-26 09:10:00.0 [08:10:00.0,09:10:00.0)
No1: 商品ID=1715浏览量=3
No2: 商品ID=3611281浏览量=2
No3: 商品ID=2244074浏览量=1
=============================
时间: 2017-11-26 09:15:00.0 [08:15:00.0,09:15:00.0)
No1: 商品ID=1715浏览量=3
No2: 商品ID=3611281浏览量=2
No3: 商品ID=2244074浏览量=2
=============================
时间: 2017-11-26 09:20:00.0 [08:20:00.0,09:20:00.0)
No1: 商品ID=1715浏览量=3
No2: 商品ID=3611281浏览量=2
No3: 商品ID=2244074浏览量=2
=============================
时间: 2017-11-26 09:25:00.0 [08:25:00.0,09:25:00.0)
No1: 商品ID=1715浏览量=3
No2: 商品ID=3611281浏览量=2
No3: 商品ID=2244074浏览量=2
=============================
时间: 2017-11-26 09:30:00.0 [08:30:00.0,09:30:00.0)
No1: 商品ID=1715浏览量=3
No2: 商品ID=2244074浏览量=2
No3: 商品ID=3611281浏览量=2
=============================
时间: 2017-11-26 09:35:00.0 [08:35:00.0,09:35:00.0)
No1: 商品ID=1715浏览量=3
No2: 商品ID=2244074浏览量=2
No3: 商品ID=3611281浏览量=2
=============================
时间: 2017-11-26 09:40:00.0 [08:40:00.0,09:40:00.0)
No1: 商品ID=1715浏览量=3
No2: 商品ID=2244074浏览量=2
No3: 商品ID=3611281浏览量=2
=============================
时间: 2017-11-26 09:45:00.0 [08:45:00.0,09:45:00.0)
No1: 商品ID=1715浏览量=3
No2: 商品ID=2244074浏览量=2
No3: 商品ID=3611281浏览量=2
=============================
时间: 2017-11-26 09:50:00.0 [08:50:00.0,09:50:00.0)
No1: 商品ID=1715浏览量=3
No2: 商品ID=3611281浏览量=2
No3: 商品ID=2244074浏览量=2
=============================
时间: 2017-11-26 09:55:00.0 [08:55:00.0,09:55:00.0)
No1: 商品ID=1715浏览量=3
No2: 商品ID=3611281浏览量=2
No3: 商品ID=2244074浏览量=2
=============================
时间: 2017-11-26 10:00:00.0 [09:00:00.0,10:00:00.0)
No1: 商品ID=1715浏览量=3
No2: 商品ID=3611281浏览量=2
No3: 商品ID=2244074浏览量=2
=============================
时间: 2017-11-26 10:05:00.0 [09:05:00.0,10:05:00.0)
No1: 商品ID=1715浏览量=3
No2: 商品ID=2244074浏览量=1
No3: 商品ID=1256540浏览量=1
=============================
时间: 2017-11-26 10:10:00.0 [09:10:00.0,10:10:00.0)
No1: 商品ID=1715浏览量=1
No2: 商品ID=2244074浏览量=1
=============================
时间: 2017-11-26 10:15:00.0 [09:15:00.0,10:15:00.0)
No1: 商品ID=1715浏览量=1
=============================
时间: 2017-11-26 10:20:00.0 [09:20:00.0,10:20:00.0)
No1: 商品ID=1715浏览量=1
=============================
时间: 2017-11-26 10:25:00.0 [09:25:00.0,10:25:00.0)
No1: 商品ID=1715浏览量=1
=============================
时间: 2017-11-26 10:30:00.0 [09:30:00.0,10:30:00.0)
No1: 商品ID=1715浏览量=1
=============================
时间: 2017-11-26 10:35:00.0 [09:35:00.0,10:35:00.0)
No1: 商品ID=1715浏览量=1
=============================
时间: 2017-11-26 10:40:00.0 [09:40:00.0,10:40:00.0)
No1: 商品ID=1715浏览量=1
=============================
时间: 2017-11-26 10:45:00.0 [09:45:00.0,10:45:00.0)
No1: 商品ID=1715浏览量=1
=============================
时间: 2017-11-26 10:50:00.0 [09:50:00.0,10:50:00.0)
No1: 商品ID=1715浏览量=1
=============================
时间: 2017-11-26 10:55:00.0 [09:55:00.0,10:55:00.0)
No1: 商品ID=1715浏览量=1
=============================
子模块:NetworkFlowAnalysis
数据源:apache.log
字段:访问IP、用户ID、用户名、时间、请求方法、URL
测试:
右击运行NetworkFlow.scala,其中控制台输出时间为:窗口结束时间
子模块:NetworkFlowAnalysis
目的:设置滚动时间窗口,实时统计每小时内的网站PV
方式:
1.从web服务器的日志中提取对应的页面访问然后统计(但其中的url往往是某个资源地址如网页、图片等,若对页面进行统计需要进行过滤)
2.从埋点日志中提取用户发送的页面请求,统计总浏览量。
数据源:UserBehavior.csv(用户埋点日志)
实现:NetworkFlow.scala
子模块:NetworkFlowAnalysis
实现:UniqueVisitor.scala
但若数据量很大时,可以使用布隆过滤器使用位图存储(判断键值是否存在),降低存储压力。
数据源:redis
实现:UvWithBloom.scala
测试:
启动redis,终端命令:redis-server;启动redis客户端:redis-cli, keys * ->查看key
子模块:MarketAnalysis
数据源:自己生成数据源,字段[userId、行为类型、渠道、时间戳]
实现:AppMarketingByChannel.scala(分渠道实现方式)
AppMarketing.scala(不分渠道实现方式)
运行:右击编译、运行
7.1 页面广告点击量统计
目的:页面广告按照省份划分的一个小时内点击量的统计,并5s更新一次,从而了解用户偏好进而进行推荐等操作
数据源:AdClickLog.csv ->记录用户对广告的点击行为,字段[userId,广告id,省,市,时间戳]
子模块:MarketAnalysis
实现:AdStatisticsByGeo.scala
运行: 右击编译、运行
7.2 带黑名单的页面广告点击量统计
目的: 带黑名单的页面点击量统计,记录用户点击次数,将连续刷单的用户加入黑名单(状态编程),并且每天将黑名单清空(定时器)
子模块: MarketAnalysis
实现:AdStatisticsByGeoWithBlack.scala
运行:右击编译、运行
目的:对用户登录失败动作统计,若同一用户(可以是不同IP)在2秒之内连续两次登录失败,认为存在恶意登陆的风险,输出报警信息。
子模块:LoginFailDetect
数据源:LoginLog.csv [用户id,登录ip,返回类型(成功/失败),登录时间戳]
方法一:按照用户ID分流,遇到登陆失败事件时将其保存到ListState中(状态编程),并注册定时器,设置2s后触发。
定时器触发时(考虑无序数据,watermark延迟2s)检查状态中的登录失败事件个数,若大于等于2,就输出报警信息。
实现:LoginFail.scala
运行:右击编译、运行
限制:由于watermark延迟2s输出,导致遇到连续2个登陆失败及一个登录成功的数据不会报警,读完两个失败信息会再等2s才能输出报警信息,
而在2s内接收到成功登录数据后,会清空前面记录的2个失败状态,导致不会输出报警信息。
方法二(方法一的改进):
存储上次登录失败记录,与新接收的数据的时间戳比较,若小于2s,则输出报警信息。
实现:LoginFailImprove.scala
运行:右击编译、运行
限制:只能处理连续2个失败事件才报警,并且不能处理乱序数据
方法三(CEP):
CEP:复杂事件处理,允许在无休止的事件流中检测事件模式,掌握数据中重要的部分
一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据,类似正则表达式匹配一样。
模式:->三种模式,必须以.begin()开始
个体模式、组合模式(必须以初始模式begin开始)、模式组
个体模式又包含单例模式(只接受一次匹配)、循环模式(可以连续接受多个,使用量词指定循环次数)
条件:
组合条件:
.where() .or()(逻辑或) 逻辑与直接用.where()
终止条件:
.until()
迭代条件:->对模式之前所有接收的事件进行处理
.where((value,ctx)=>{...}),可以调用ctx.getEventsForPattern("name") ->"name"为以前个体模式的名称,获取到满足该模式的数据
组合模式(模式序列)
近邻:严格近邻(紧挨着)、宽松紧邻(可以不紧挨)
严格近邻:由.next()指定,对模式"a next b",事件序列[a,c,b1,b2]不匹配
宽松近邻:由.followedBy()指定,对模式"a followedBy b",事件序列[a,c,b1,b2]匹配为[a,b1]
非确定性宽松近邻:由.followedByAny()指定,对模式"a followedByAny b",事件序列[a,c,b1,b2]匹配为{a,b1},{a,b2}
.notNext() ->某个事件不严格紧邻前一个事件发生
.notFollowedBy() ->某个事件不能在两个事件之间发生
next.within(Time.seconds(10)) ->为模式指定时间约束,在该时间内发生匹配有效
模式匹配流程:
1.获取输入流
2.设定匹配模式
3.将匹配模式应用于输入流,得到patternStream对象
4.select()/flatselect()方法从检测到的事件序列中提取事件,结果为DataStream对象
5.超时事件提取,使用测输出流方式
9. 订单超时失效->可以用redis,也可以用Flink,提取超时事件(在的一段时间内,匹配了开头,但没匹配结尾)
方式一:CEP实现
目的:在15分钟内,提交订单,没支付的视为失效(解锁商品),并输出支付成功事件信息及超时事件信息(侧输出流)
子模块:OrderPayDetect
数据源:OrderLog.csv[订单id,用户行为(create/pay),pay时的交易id,时间戳]
实现:OrderTimeout.scala -->通过定义侧输出流标签,通过侧输出流输出超时事件,正常流输出支付成功事件。
CEP的非紧邻匹配模式[begin...followby..]
方式二:状态编程 ->
目的:提交订单,在15分钟后输出支付结果(若15分钟内有pay,则支付成功;没有pay,则输出失败)
子模块:OrderPayDetect
数据源:OrderLog.csv[订单id,用户行为(create/pay),pay时的交易id,时间戳]
实现:OrderTimeoutWithoutCEP.scala create来了,启动定时器,在定时器范围内pay来了,清空定时器;若定时器时间内pay没来,则报警。
缺点:只能在15分钟后输出信息,无法实时性。
方式三(方式二的改进):->支付成功的放主流(实时输出),超时的放侧输出流
目的:提交订单,支付成功实时输出成功信息,定时器处理超时事件
子模块:OderPayDetect
数据源:OderLog.csv[订单id,用户行为(create/pay),pay时的交易id,时间戳]
实现:OrderTimeoutWithoutCEPImp.scala
10. 实时对账 ->connect连接两条流,coMap/coFlatMap转换成DataStream
目的:订单支付流,与支付运营商返回日记流对账
子模块:OrderPayDetect
数据源:OrderLog.csv[订单id,用户行为(create/pay),pay时的交易id,时间戳] , ReceiptLog.csv[支付id,支付渠道,时间戳]
实现:TxMacthDetect.scala
改进:将两个数据流源改为socket源,端口分别为7777、8888
测试:情动程序TxMacthDetect.scala
启动端口: nc -lk 7777 / nc -lk 8888
测试数据7777:
34729,create,,1558430842
34730,create,,1558430843
34729,pay,sd76f87d6,1558430844
34730,pay,3hu3k2432,1558430845
34731,create,,1558430846
34731,pay,35jue34we,1558430849
测试数据8888:
ewr342as4,wechat,1558430845
sd76f87d6,wechat,1558430847
3hu3k2432,alipay,1558430848
8fdsfae83,alipay,1558430850
两条流连接时,watermark由时间戳小的那条流确定。
其他连接两条流:Join
Window Join:先Join,再开窗口,定义窗口函数。
Interval Join:
改进:使用Interval Join 连接两条流,
实现:TxMatchByJoin.scala