/Flink_Practice

使用Flink进行电商用户行为分析实践 教程链接:https://www.bilibili.com/video/av73133501

Primary LanguageScala

用户行为分析demon(仅仅用来学习实践):

1.热门商品

	1.1 抽取出业务时间戳,告诉 Flink 框架基于业务时间做窗口
	1.2 过滤出点击行为数据
	1.3 按一小时的窗口大小,每 5 分钟统计一次,做滑动窗口聚合(Sliding Window)
	1.4 按每个窗口聚合,输出每个窗口中点击量前 N 名的商品
	
测试热门TopN商品:
	kafka数据源:滑动窗口->5分钟一个窗口(若后面没有延迟,则会每5分钟输出一次往前一个小时内的统计结果)
	启动kafka:
			cd /usr/local/kafka
			./bin/zookeeper-server-start.sh config/zookeeper.properties
			./bin/kafka-server-start.sh config/server.properties
	终端启动kafka生产者: 
			./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hotitems   
	Behavior.csv:
		543462,1715,1464116,pv,1511658000  (窗口开始时间戳:1511658000 ->2017/11/26 9:0:0 ,第一个窗口结束时间:2017/11/26 9:05:0 第一个结果输出应该是5分钟后,但定时器延迟了1s才输出,则到5mins+1s后才会有结果输出)
		662867,2244074,1575622,pv,1511658060
		561558,3611281,965809,pv,1511658120
		894923,1715,1879194,pv,1511658180
		834377,2244074,3738615,pv,1511658240
		625915,3611281,570735,pv,1511658300
		578814,1715,982926,pv,1511658330
		87335,1256540,1451783,pv,1511658540
		429984,2244074,2355072,pv,1511658600
		937166,1715,2355072,pv,1511661600
	
	BUG1:
		Caused by: org.apache.kafka.common.errors.InvalidTopicException: Topic 'hotitems' is invalid 
	解决:将kafka、flink集群、应用应用程序重新启动。
	
	
	测试过程:
		543462,1715,1464116,pv,1511658000       ->2017/11/26 9:00:00
		662867,2244074,1575622,pv,1511658060    ->2017/11/26 9:01:00
		561558,3611281,965809,pv,1511658120     ->2017/11/26 9:02:00
		625915,3611281,570735,pv,1511658300     ->2017/11/26 9:05:00
		578814,1715,982926,pv,1511658301        ->2017/11/26 9:05:01
		578814,1715,982926,pv,1511658330        ->2017/11/26 9:05:30
		87335,1256540,1451783,pv,1511658540     ->2017/11/26 9:09:00
		429984,2244074,2355072,pv,1511658600    ->2017/11/26 9:10:00
		937166,1715,2355072,pv,1511661600       ->2017/11/26 10:00:00
		937170,1715,2355072,pv,1511665200  	    ->2017/11/26 11:00:00
		
	输出结果:
		其中时间为窗口结束时间             时间窗口
		第一条数据EventTime的时间戳 1511658000 ->2017/11/26 9:00:00 ,若第一条数据的EventTime的时间戳  1511658060 ->2017/11/26 9:01:00,下面时间窗口分段不变
		
		时间: 2017-11-26 09:05:00.0      [08:05:00.0,09:05:00.0)
		No1: 商品ID=1715浏览量=1       
		No2: 商品ID=3611281浏览量=1
		No3: 商品ID=2244074浏览量=1
		=============================
		时间: 2017-11-26 09:10:00.0      [08:10:00.0,09:10:00.0)
		No1: 商品ID=1715浏览量=3
		No2: 商品ID=3611281浏览量=2
		No3: 商品ID=2244074浏览量=1
		=============================
		时间: 2017-11-26 09:15:00.0      [08:15:00.0,09:15:00.0)
		No1: 商品ID=1715浏览量=3
		No2: 商品ID=3611281浏览量=2
		No3: 商品ID=2244074浏览量=2
		=============================
		时间: 2017-11-26 09:20:00.0      [08:20:00.0,09:20:00.0)
		No1: 商品ID=1715浏览量=3
		No2: 商品ID=3611281浏览量=2
		No3: 商品ID=2244074浏览量=2
		=============================
		时间: 2017-11-26 09:25:00.0      [08:25:00.0,09:25:00.0)
		No1: 商品ID=1715浏览量=3
		No2: 商品ID=3611281浏览量=2
		No3: 商品ID=2244074浏览量=2
		=============================
		时间: 2017-11-26 09:30:00.0      [08:30:00.0,09:30:00.0)
		No1: 商品ID=1715浏览量=3
		No2: 商品ID=2244074浏览量=2
		No3: 商品ID=3611281浏览量=2
		=============================
		时间: 2017-11-26 09:35:00.0      [08:35:00.0,09:35:00.0)
		No1: 商品ID=1715浏览量=3
		No2: 商品ID=2244074浏览量=2
		No3: 商品ID=3611281浏览量=2
		=============================
		时间: 2017-11-26 09:40:00.0      [08:40:00.0,09:40:00.0)
		No1: 商品ID=1715浏览量=3
		No2: 商品ID=2244074浏览量=2
		No3: 商品ID=3611281浏览量=2
		=============================
		时间: 2017-11-26 09:45:00.0      [08:45:00.0,09:45:00.0)
		No1: 商品ID=1715浏览量=3
		No2: 商品ID=2244074浏览量=2
		No3: 商品ID=3611281浏览量=2
		=============================
		时间: 2017-11-26 09:50:00.0      [08:50:00.0,09:50:00.0)
		No1: 商品ID=1715浏览量=3
		No2: 商品ID=3611281浏览量=2
		No3: 商品ID=2244074浏览量=2
		=============================
		时间: 2017-11-26 09:55:00.0      [08:55:00.0,09:55:00.0)
		No1: 商品ID=1715浏览量=3
		No2: 商品ID=3611281浏览量=2
		No3: 商品ID=2244074浏览量=2
		=============================
		时间: 2017-11-26 10:00:00.0      [09:00:00.0,10:00:00.0)
		No1: 商品ID=1715浏览量=3
		No2: 商品ID=3611281浏览量=2
		No3: 商品ID=2244074浏览量=2
		=============================
		时间: 2017-11-26 10:05:00.0      [09:05:00.0,10:05:00.0)
		No1: 商品ID=1715浏览量=3
		No2: 商品ID=2244074浏览量=1
		No3: 商品ID=1256540浏览量=1
		=============================
		时间: 2017-11-26 10:10:00.0      [09:10:00.0,10:10:00.0)
		No1: 商品ID=1715浏览量=1
		No2: 商品ID=2244074浏览量=1
		=============================
		时间: 2017-11-26 10:15:00.0      [09:15:00.0,10:15:00.0)
		No1: 商品ID=1715浏览量=1
		=============================
		时间: 2017-11-26 10:20:00.0      [09:20:00.0,10:20:00.0)
		No1: 商品ID=1715浏览量=1
		=============================
		时间: 2017-11-26 10:25:00.0      [09:25:00.0,10:25:00.0)
		No1: 商品ID=1715浏览量=1
		=============================
		时间: 2017-11-26 10:30:00.0      [09:30:00.0,10:30:00.0)
		No1: 商品ID=1715浏览量=1
		=============================
		时间: 2017-11-26 10:35:00.0      [09:35:00.0,10:35:00.0)
		No1: 商品ID=1715浏览量=1
		=============================
		时间: 2017-11-26 10:40:00.0      [09:40:00.0,10:40:00.0)
		No1: 商品ID=1715浏览量=1
		=============================
		时间: 2017-11-26 10:45:00.0      [09:45:00.0,10:45:00.0)
		No1: 商品ID=1715浏览量=1
		=============================
		时间: 2017-11-26 10:50:00.0      [09:50:00.0,10:50:00.0)
		No1: 商品ID=1715浏览量=1
		=============================
		时间: 2017-11-26 10:55:00.0      [09:55:00.0,10:55:00.0)
		No1: 商品ID=1715浏览量=1
		=============================

2. 实时流量统计

	子模块:NetworkFlowAnalysis
	数据源:apache.log
		字段:访问IP、用户ID、用户名、时间、请求方法、URL
		
	测试:
		右击运行NetworkFlow.scala,其中控制台输出时间为:窗口结束时间

3. 网站总流量统计(pv)

	子模块:NetworkFlowAnalysis
	
	目的:设置滚动时间窗口,实时统计每小时内的网站PV
	方式:
		1.从web服务器的日志中提取对应的页面访问然后统计(但其中的url往往是某个资源地址如网页、图片等,若对页面进行统计需要进行过滤)
		2.从埋点日志中提取用户发送的页面请求,统计总浏览量。
	数据源:UserBehavior.csv(用户埋点日志)
	实现:NetworkFlow.scala

4. UV

	子模块:NetworkFlowAnalysis
	实现:UniqueVisitor.scala
	但若数据量很大时,可以使用布隆过滤器使用位图存储(判断键值是否存在),降低存储压力。

5. 布隆过滤器实现UV统计

	数据源:redis
	
	实现:UvWithBloom.scala
	测试:
		启动redis,终端命令:redis-server;启动redis客户端:redis-cli, keys *  ->查看key

6. App市场推广统计(分渠道和不分渠道)

	子模块:MarketAnalysis
	数据源:自己生成数据源,字段[userId、行为类型、渠道、时间戳]
	实现:AppMarketingByChannel.scala(分渠道实现方式)
		AppMarketing.scala(不分渠道实现方式)
	运行:右击编译、运行

7. 页面广告分析

	7.1 页面广告点击量统计
		目的:页面广告按照省份划分的一个小时内点击量的统计,并5s更新一次,从而了解用户偏好进而进行推荐等操作
		数据源:AdClickLog.csv ->记录用户对广告的点击行为,字段[userId,广告id,省,市,时间戳]
		子模块:MarketAnalysis
		实现:AdStatisticsByGeo.scala
		运行: 右击编译、运行
		
	7.2 带黑名单的页面广告点击量统计
		目的: 带黑名单的页面点击量统计,记录用户点击次数,将连续刷单的用户加入黑名单(状态编程),并且每天将黑名单清空(定时器)
		子模块: MarketAnalysis
		实现:AdStatisticsByGeoWithBlack.scala
		运行:右击编译、运行

8. 恶意登录监控

	目的:对用户登录失败动作统计,若同一用户(可以是不同IP)在2秒之内连续两次登录失败,认为存在恶意登陆的风险,输出报警信息。
	子模块:LoginFailDetect
	数据源:LoginLog.csv [用户id,登录ip,返回类型(成功/失败),登录时间戳]
		方法一:按照用户ID分流,遇到登陆失败事件时将其保存到ListState中(状态编程),并注册定时器,设置2s后触发。
					定时器触发时(考虑无序数据,watermark延迟2s)检查状态中的登录失败事件个数,若大于等于2,就输出报警信息。
			实现:LoginFail.scala
			运行:右击编译、运行
			限制:由于watermark延迟2s输出,导致遇到连续2个登陆失败及一个登录成功的数据不会报警,读完两个失败信息会再等2s才能输出报警信息,
				而在2s内接收到成功登录数据后,会清空前面记录的2个失败状态,导致不会输出报警信息。
				
		方法二(方法一的改进):
			存储上次登录失败记录,与新接收的数据的时间戳比较,若小于2s,则输出报警信息。
			实现:LoginFailImprove.scala
			运行:右击编译、运行
			限制:只能处理连续2个失败事件才报警,并且不能处理乱序数据
		方法三(CEP):
			CEP:复杂事件处理,允许在无休止的事件流中检测事件模式,掌握数据中重要的部分
				一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据,类似正则表达式匹配一样。
			
			模式:->三种模式,必须以.begin()开始
				个体模式、组合模式(必须以初始模式begin开始)、模式组
				个体模式又包含单例模式(只接受一次匹配)、循环模式(可以连续接受多个,使用量词指定循环次数)
					条件:
						组合条件:
							.where()   .or()(逻辑或) 逻辑与直接用.where()
						终止条件:
							.until()
						迭代条件:->对模式之前所有接收的事件进行处理
									.where((value,ctx)=>{...}),可以调用ctx.getEventsForPattern("name") ->"name"为以前个体模式的名称,获取到满足该模式的数据
				
				组合模式(模式序列)
					近邻:严格近邻(紧挨着)、宽松紧邻(可以不紧挨)
						严格近邻:由.next()指定,对模式"a next b",事件序列[a,c,b1,b2]不匹配
						宽松近邻:由.followedBy()指定,对模式"a followedBy b",事件序列[a,c,b1,b2]匹配为[a,b1]
						非确定性宽松近邻:由.followedByAny()指定,对模式"a followedByAny b",事件序列[a,c,b1,b2]匹配为{a,b1},{a,b2}
							
					.notNext() ->某个事件不严格紧邻前一个事件发生
					.notFollowedBy() ->某个事件不能在两个事件之间发生
					
				next.within(Time.seconds(10)) ->为模式指定时间约束,在该时间内发生匹配有效
				
				模式匹配流程:
					1.获取输入流
					2.设定匹配模式
					3.将匹配模式应用于输入流,得到patternStream对象
					4.select()/flatselect()方法从检测到的事件序列中提取事件,结果为DataStream对象
					5.超时事件提取,使用测输出流方式

9. 订单超时失效->可以用redis,也可以用Flink,提取超时事件(在的一段时间内,匹配了开头,但没匹配结尾)

	方式一:CEP实现
		目的:在15分钟内,提交订单,没支付的视为失效(解锁商品),并输出支付成功事件信息及超时事件信息(侧输出流)
		子模块:OrderPayDetect
		数据源:OrderLog.csv[订单id,用户行为(create/pay),pay时的交易id,时间戳]
		实现:OrderTimeout.scala  -->通过定义侧输出流标签,通过侧输出流输出超时事件,正常流输出支付成功事件。
			CEP的非紧邻匹配模式[begin...followby..]
	
	方式二:状态编程 ->
		目的:提交订单,在15分钟后输出支付结果(若15分钟内有pay,则支付成功;没有pay,则输出失败)
		子模块:OrderPayDetect
		数据源:OrderLog.csv[订单id,用户行为(create/pay),pay时的交易id,时间戳]
		实现:OrderTimeoutWithoutCEP.scala     create来了,启动定时器,在定时器范围内pay来了,清空定时器;若定时器时间内pay没来,则报警。
		缺点:只能在15分钟后输出信息,无法实时性。
		
	方式三(方式二的改进):->支付成功的放主流(实时输出),超时的放侧输出流
		目的:提交订单,支付成功实时输出成功信息,定时器处理超时事件
		子模块:OderPayDetect
		数据源:OderLog.csv[订单id,用户行为(create/pay),pay时的交易id,时间戳]
		实现:OrderTimeoutWithoutCEPImp.scala

10. 实时对账 ->connect连接两条流,coMap/coFlatMap转换成DataStream

		目的:订单支付流,与支付运营商返回日记流对账
		子模块:OrderPayDetect
		数据源:OrderLog.csv[订单id,用户行为(create/pay),pay时的交易id,时间戳] , ReceiptLog.csv[支付id,支付渠道,时间戳]
		实现:TxMacthDetect.scala
		
		改进:将两个数据流源改为socket源,端口分别为7777、8888
		测试:情动程序TxMacthDetect.scala
			启动端口: nc -lk 7777   /  nc -lk 8888
			
			测试数据7777:
				34729,create,,1558430842
				34730,create,,1558430843
				34729,pay,sd76f87d6,1558430844
				34730,pay,3hu3k2432,1558430845
				34731,create,,1558430846
				34731,pay,35jue34we,1558430849
			测试数据8888:
				ewr342as4,wechat,1558430845
				sd76f87d6,wechat,1558430847
				3hu3k2432,alipay,1558430848
				8fdsfae83,alipay,1558430850
				
			两条流连接时,watermark由时间戳小的那条流确定。
			
		其他连接两条流:Join
			Window Join:先Join,再开窗口,定义窗口函数。
			Interval Join:
		改进:使用Interval Join 连接两条流, 
			实现:TxMatchByJoin.scala