/partial-key-grouping

An implementation and example of Partial Key Grouping for Apache Storm. Partial Key Grouping is a load balancing strategy for distributed stream processing systems.

Primary LanguageJavaApache License 2.0Apache-2.0

Partial Key Grouping实现

An implementation and example of Partial Key Grouping for Apache Storm. Partial Key Grouping is a load balancing strategy for distributed stream processing systems. Partial Key grouping : 和Fields Grouping类似,多了load balance,解决data skew **数据倾斜**的问题。

适用场景

在使用 Fields Grouping 的时候,按照字段 Fields 进行分发,若data中存在某些key值数据有大量的倾斜,会导致某个 task 上的数据处理量十分多,造成处理效率降低。这就是数据倾斜。 数据倾斜 具体在手机阅读BI数据中的表现:

  • 存在大量电话号码是 000000 的话单数据。如果按照Fields GroupingMSISDN进行分发数据,就会出现data skew
  • 某个省份的话单数据远远多于其它省份,比如北京的data 远远多于西藏等偏远地区。如果按照Fields GroupingProvinceID分发数据,就会出现data skew

数据倾斜表现

  • 在UI界面可以发现某个task处理的数据量远远大于其它的task。影响了Storm的处理速度。
  • 这个task的任务处理延时比其它的task处理延时高。

解决办法

Partial Key Grouping : 类似Fields grouping , 同时具有 load balanced between two downstream bolts,有效的解决了输入数据的倾斜问题. (这篇paper 解释了它的工作原理和优点。)

实现原理

yahoo Lab 实现了一个简单的示例。

  • 实现 CustomStreamGrouping, Serializable来自定义流的分组策略
  • code line 28 - 38 是核心代码。采用了2个不同的 HashFunction 来对第0个Fields进行Hash的分发,数据倾斜的数据会比较均衡的分发到2个boltIds(Task)里面,这样不会出现数据倾斜。

注意:相同的数据不是说完全均衡的分发到2个boltIds,看代码就容易明白。有一个很重要的long型数组记录每个boltIds当前处理的tuple个数,算法会保证每个boltIds之间的数据相对均衡,但是不是我们理解的shuffle 的全均衡方式。(若不明白,请看代码实现)

使用

自定义流分组策略,如何使用呢? 由于采用PKG 会出现相同的key会出现在2个task里面,所以我们需要一个聚合bolt来将这些数据给聚合起来。可以参考示例WordCount

改进

这个简单的实现,可以对自己的应用定制化的改进数据倾斜。如果想对这个组件进行更好的通用化设计,要和Fields Grouping的接口方式看齐,就是说可以传入Fields参数。代码对比见分晓:

builder.setBolt("counter", new CounterBolt(), 10).customGrouping("split", new PartialKeyGrouping());
builder.setBolt("aggregator", new AggregatorBolt(), 1).fieldsGrouping("counter", new Fields("word"));