An implementation and example of Partial Key Grouping for Apache Storm.
Partial Key Grouping is a load balancing strategy for distributed stream processing systems.
Partial Key grouping : 和Fields Grouping类似,多了load balance,解决data skew **数据倾斜
**的问题。
在使用 Fields Grouping 的时候,按照字段 Fields
进行分发,若data中存在某些key
值数据有大量的倾斜,会导致某个 task 上的数据处理量十分多,造成处理效率降低。这就是数据倾斜。
数据倾斜 具体在手机阅读BI数据中的表现:
- 存在大量电话号码是 000000 的话单数据。如果按照Fields Grouping取
MSISDN
进行分发数据,就会出现data skew。 - 某个省份的话单数据远远多于其它省份,比如北京的data 远远多于西藏等偏远地区。如果按照Fields Grouping取
ProvinceID
分发数据,就会出现data skew。
- 在UI界面可以发现某个task处理的数据量远远大于其它的task。影响了Storm的处理速度。
- 这个task的任务处理延时比其它的task处理延时高。
Partial Key Grouping : 类似Fields grouping , 同时具有 load balanced between two downstream bolts,有效的解决了输入数据的倾斜问题. (这篇paper 解释了它的工作原理和优点。)
yahoo Lab 实现了一个简单的示例。
- 实现
CustomStreamGrouping
,Serializable
来自定义流的分组策略 - code line 28 - 38 是核心代码。采用了2个不同的
HashFunction
来对第0个Fields
进行Hash的分发,数据倾斜的数据会比较均衡的分发到2个boltIds
(Task)里面,这样不会出现数据倾斜。
注意:相同的数据不是说完全均衡的分发到2个
boltIds
,看代码就容易明白。有一个很重要的long
型数组记录每个boltIds
当前处理的tuple
个数,算法会保证每个boltIds之间的数据相对均衡,但是不是我们理解的shuffle 的全均衡方式。(若不明白,请看代码实现)
自定义流分组策略,如何使用呢?
由于采用PKG 会出现相同的key
会出现在2个task
里面,所以我们需要一个聚合bolt来将这些数据给聚合起来。可以参考示例WordCount。
这个简单的实现,可以对自己的应用定制化的改进数据倾斜。如果想对这个组件进行更好的通用化设计,要和Fields Grouping的接口方式看齐,就是说可以传入Fields
参数。代码对比见分晓:
builder.setBolt("counter", new CounterBolt(), 10).customGrouping("split", new PartialKeyGrouping());
builder.setBolt("aggregator", new AggregatorBolt(), 1).fieldsGrouping("counter", new Fields("word"));