Consumer Library是对LogHub消费者提供的高级模式,解决多个消费者同时消费logstore时自动分配shard问题。 例如在storm、spark场景中多个消费者情况下,自动处理shard的负载均衡,消费者failover等逻辑。用户只需专注在自己业务逻辑上,而无需关心shard分配、CheckPoint、Failover等事宜。
举一个例子而言,用户需要通过storm进行流计算,启动了A、B、C 3个消费实例。在有10个shard情况下,系统会自动为A、B、C分配3、3、4个Shard进行消费。
- 当消费实例A宕机情况下,系统会把A未消费的3个Shard中数据自动均衡B、C上,当A恢复后,会重新均衡。
- 当添加实例D、E情况下,系统会自动进行均衡,每个实例消费2个Shard。
- 当Shard有Merge/Split等情况下,会根据最新的Shard信息,重新均衡。
- 当read only状态的shard消费完之后,剩余的shard会重新做负载均衡。
以上整个过程不会产生数据丢失、以及重复,用户只需在代码中做三件事情:
- 创建Consumer Group。
- 将实例名注册为Instance,并连接到Consumer Group中。
- 写处理日志的代码。
我们强烈建议使用Consumer Library进行数据消费,这样您只需要关心怎么处理数据,而不需要关注复杂的负载均衡、消费断点保存、按序消费、消费异常处理等问题。
Consumer Library中主要有4个概念,分别是consumer group、consumer、heartbeat和checkpoint,它们之间的关系如下:
-
consumer group
是logstore的子资源,拥有相同consumer group 名字的消费者共同消费同一个logstore的所有数据,这些消费者之间不会重复消费数据,一个logstore下面可以最多创建5个consumer group,不可以重名,同一个logstore下面的consumer group之间消费数据不会互相影响。consumer group有两个很重要的属性:
{
"order":boolean,
"timeout": integer
}
order属性表示是否按照写入时间顺序消费key相同的数据,timeout表示consumer group中消费者的超时时间,单位是秒,当一个消费者汇报心跳的时间间隔超过timeout,会被认为已经超时,服务端认为这个consumer此时已经下线了。
-
consumer
消费者,每个consumer上会被分配若干个shard,consumer的职责就是要消费这些shard上的数据,同一个consumer group中的consumer必须不重名。
-
heartbeat
消费者心跳,consumer需要定期向服务端汇报一个心跳包,用于表明自己还处于存活状态。
-
checkpoint
消费者定期将分配给自己的shard消费到的位置保存到服务端,这样当这个shard被分配给其它消费者时,从服务端可以获取shard的消费断点,接着从断点继续消费数据。
定义如下消费者和服务端的通信接口:
/*
创建consumer group,inOrder表示是否希望key相同的数据能够按照写入的时间顺序被消费,
timeoutInSec表示consumer的心跳超时时间,超过timeoutInSec没有汇报心跳的consumer会被认为已经下线了,建议取值20s左右。
*/
boolean CreateConsumerGroup(
String project,
String logstore,
String consumerGroupName,
boolean inOrder,
int timeoutInSec);
/*
删除consumer group
*/
boolean DeleteConsumerGroup(
String project,
String logStore,
String consumerGroup);
/*
列出logstore下所有的consumer group,包括每个consumer group的order、timeout属性
*/
List<ConsumerGroup> ListConsumerGroup(
String project,
String logStore);
/*
更新consumer group的属性,consumer group的名字不可以更新,如果将inOrder由true更新为false,
那么当前所有未开始消费的shard将会被分配至各个consumer,如果inOrder由false更新为true,对于当前正在消费的shard不会生效,
也就是说即使shard之间有父子关系,由于它们都已经被消费了,所以顺序对他们而言没有意义,但是inOrder属性更新之后,shard分裂和合并产生的新的shard将会被顺序消费。
*/
boolean UpdateConsumerGroup(
String project,
String logStore,
String consumerGroup,
boolean inOrder,
int timeoutInSec);
/*
更新shard消费到的位置,只有当参数中的shard当前由consumer持有的情况下才能更新成功
*/
boolean UpdateCheckPoint(
String project,
String logStore,
String consumerGroup,
String consumer,
int shard,
String checkpoint);
/*
更新shard消费到的位置,无论如何都可以更新成功
*/
boolean UpdateCheckPoint(
String project,
String logStore,
String consumerGroup,
int shard,
String checkpoint);
/*
获取consumer group中shard的消费断点
*/
String GetCheckPoint(
String project,
String logStore,
String consumerGroup,
int shard);
/*
获取consumer group中所有shard的消费断点,返回结果是一个断点的List,具体请参考sdk
*/
List<ShardCheckPoint> GetCheckPoint(
String project,
String logStore,
String consumerGroup);
/*
将当前consumer持有的shard汇报给服务端,服务端返回确认包,里面包含若干shard,假设持有的shard集合为A,返回的确认shard集合为B,
A和B的差集(A-B)表示consumer应该放弃消费的shard,consumer应该尽快将(A-B)中的shard的消费断点保存到服务端,并放弃消费(A-B)中的shard,
(B-A)表示consumer可以增持的shard,consumer从服务端获取(B-A)中shard的checkpoint。理想状态下下一次心跳汇报给服务端的shard集合就是B,
但是如果consumer不想(没来得及)放弃A-B中的某些shard,需要将这些shard一并汇报给服务端。
Heartbeat除了上面有消费负载均衡的功能以外,还用于告知服务端自己处于存活状态,不要将自己从consumer group中删除。一旦heartbeat超时,
服务端就会将consumer从consumer group中删除,并将分配给其的shard重新分配给别的consumer。
*/
List<Integer> HeartBeat(
String project,
String logStore,
String consumerGroup,
String consumer,
ArrayList<Integer> shards);
服务端对consumer group中每个shard都会维护一个有限状态自动机,共有五种状态,分别是already_alloc、not_alloc、wait、transfer、over,每种状态的含义如下:
already_alloc:该shard已经被某个consumer持有并消费。
not_alloc:该shard可以消费,但是尚未被任何consumer持有。
wait:该shard当前不可以消费,需要等待其祖先shard消费完。
transfer:将该shard转交给另一个消费者消费的过渡状态。只有当当前消费者在Heartbeat中放弃消费该shard,才能将该shard转交出去,因此需要这个过渡状态。
over:该shard的数据已经消费完。
wait状态需要重点说明下,假设某个时刻数据仓库所有shard的关系如下图:
初始时有3个shard 0、1、2,每个shard下面的区间表示关联的hash值的集合,这里为了简单用数值型表示hash key,此时hash值为7的数据会被写入shard 0,随后0 split成3和4,shard 0变成read_only状态,shard 3和4变成read_write状态,这个时候hash值是7的数据不会再被写入shard 0,而是写入shard 4,再接着shard 4和5 merge成6之后,hash值为7的数据只会写入shard 6。
如果要顺序消费hash值为7的数据,必须保证在shard 0被消费完之前shard 4不应该被任何消费者消费,同理,shard 6不应该在shard 4中数据消费完之前被消费,我们把shard 0、4认为是shard 6的祖先,shard 6称为shard 0和4的后代,某个shard可以被消费当且仅当其祖先shard中的数据被消费完。基于这个原因,引入wait状态表示该shard当前不可以被消费。
状态转移图如下:
图中每个状态用首字母缩写表示,每条边对应含义如下:
1 表示shard的起始状态只能是wait或者not_alloc。
2 表示该shard的祖先shard的数据已经被消费完,可以开始消费当前shard的数据了。
3 表示该shard被分配给了某一个消费者。
4 表示消费该shard的consumer心跳超时了,回收其持有的shard。
5 表示持有该shard的消费者持有的shard总数太多,不满足任意两个消费者持有shard数量之差的绝对值小于等于1,所以将该shard转移给别的需要的消费者消费,这时会将等待消费的消费者(next consumer)和这个shard关联起来。
6 表示收到持有该shard的consumer放弃消费的Heartbeat,并将该shard转移给关联的next consumer持有。发生该转移还有一种可能是该shard的next consumer超时,继续由持有该shard的消费者持有该shard。
7 表示当强制更新over状态的shard的消费断点到某个非数据结束位置时,该shard恢复可消费状态。执行这种更新操作要特别当心,因为该shard的后代shard可能已经被消费了,很可能导致数据无法按照hash key的顺序消费。
8 表示read_only状态的shard数据被消费完了。
9 表示持有该shard的消费者Heartbeat超时,回收该shard到not_alloc状态。
这里要注意以下几点:
- shard 处于transfer状态时,服务端收到持有该shard的消费者的Heartbeat时,返回的确认shard集合中不会包含该shard,确认shard集合中只会包含持有者是该consumer并且shard状态是already_alloc的shard。
- 消费者调用UpdateCheckpoint更新消费断点,如果是read_only状态的shard要检查该checkpoint是否是shard的结尾,如果是就需要将shard状态转移成over。
- 5 标识的转移是保证任意两个消费者持有shard数量之差的绝对值小于等于1的基础,当发现有consumer持有的shard数量不满足该条件时,从shard持有数量多的消费者那里剥夺一些shard,分配给持有数量少的consumer,这个过程称为消费负载均衡。这些将要易手的shard需要设置成transfer状态,以等待持有者heartbeat中确认放弃,这主要是为了让持有者收到放弃消息时将消费断点保存到服务端,从而易手之后,新的consumer可以从服务端获取该shard的消费断点。
- 新的shard加入时,只能由转移1进入,其状态为wait当该shard有祖先shard,并且其祖先没有消费完,否则状态为not_alloc。
- 消费的负载均衡只会考虑not_alloc、already_alloc、transfer状态的shard,wait和over状态的shard由于不满足消费条件,所以不会被分配给任何consumer。
- 实现Consumer Library中的两个接口:
- ILogHubProcessor // 每个shard对应一个实例,每个实例只消费特定shard的数据。
- ILogHubProcessorFactory // 负责生产实现ILogHubProcessor接口实例。
- 填写参数配置。
- 启动一个或多个client worker实例。
public static void main(String args[])
{
LogHubConfig config = new LogHubConfig(...);
ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
//thread运行之后,client worker会自动运行,ClientWorker扩展了Runnable接口。
thread.start();
//调用worker的shutdown函数,退出消费实例,关联的线程也会自动停止。
worker.shutdown();
//ClientWorker运行过程中会生成多个异步的Task,shutdown之后最好等待还在执行的Task安全退出,建议30s。
Thread.sleep(30 * 1000);
}
- 各个shard对应的消费实例类,实际开发过程中用户主要需要关注数据消费逻辑,同一个ClientWorker实例是串行消费数据的,只会产生一个ILogHubProcessor实例,ClientWorker退出的时候会调用ILogHubProcessor的shutdown函数。
public class SampleLogHubProcessor implements ILogHubProcessor
{
private int mShardId;
// 记录上次持久化check point的时间
private long mLastCheckTime = 0;
public void initialize(int shardId)
{
mShardId = shardId;
}
// 消费数据的主逻辑
public String process(List<LogGroupData> logGroups,
ILogHubCheckPointTracker checkPointTracker)
{
for (LogGroupData group : logGroups)
{
List<LogItem> items = group.GetAllLogs();
for (LogItem item : items)
{
// 打印loggroup中的数据
System.out.println("shard_id:" + mShardId + " " + item.ToJsonString());
}
}
long curTime = System.currentTimeMillis();
// 每隔60秒,写一次check point到服务端,如果60秒内,worker crash,
// 新启动的worker会从上一个checkpoint其消费数据,有可能有重复数据
if (curTime - mLastCheckTime > 60 * 1000)
{
try
{
checkPointTracker.saveCheckPoint(true);
}
catch (LogHubCheckPointException e)
{
e.printStackTrace();
}
mLastCheckTime = curTime;
}
else
{
try
{
checkPointTracker.saveCheckPoint(false);
}
catch (LogHubCheckPointException e)
{
e.printStackTrace();
}
}
// 返回空表示正常处理数据, 如果需要回滚到上个check point的点进行重试的话,可以return checkPointTracker.getCheckpoint()
return null;
}
// 当worker退出的时候,会调用该函数,用户可以在此处做些清理工作。
public void shutdown(ILogHubCheckPointTracker checkPointTracker)
{
//将消费断点保存到服务端。
try {
checkPointTracker.saveCheckPoint(true);
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
}
}
- 生成 ILogHubProcessor的工厂类 :
public class SampleLogHubProcessorFactory implements ILogHubProcessorFactory
{
public ILogHubProcessor generatorProcessor()
{
// 生成一个消费实例
return new SampleLogHubProcessor();
}
}
public class LogHubConfig
{
//worker默认的拉取数据的时间间隔
public static final long DEFAULT_DATA_FETCH_INTERVAL_MS = 200;
//consumer group的名字
private String mConsumerGroupName;
//consumer的名字,必须确保同一个consumer group下面的各个consumer不重名
private String mWorkerInstanceName;
//loghub数据接口地址
private String mLogHubEndPoint;
//项目名称
private String mProject;
//日志库名称
private String mLogStore;
//云账号的access key id
private String mAccessId;
//云账号的access key
private String mAccessKey;
//用于指出在服务端没有记录shard的checkpoint的情况下应该从什么位置消费shard,取值可以是[BEGIN_CURSOR, END_CURSOR, SPECIAL_TIMER_CURSOR]中的一个
private LogHubCursorPosition mCursorPosition;
//当mCursorPosition取值为SPECIAL_TIMER_CURSOR时,指定消费时间,单位是秒。
private int mLoghubCursorStartTime = 0;
// 轮询获取loghub数据的时间间隔,间隔越小,抓取越快,单位是ms,默认是DEFAULT_DATA_FETCH_INTERVAL_MS,建议取值200ms以上
private long mDataFetchIntervalMillis;
// worker想服务端汇报心跳的时间间隔,单位是毫秒,建议取值10000ms以上。
private long mHeartBeatIntervalMillis;
//是否按序消费
private boolean mConsumeInOrder;
//SPL语句,如 *| where a = 'xxx', 详情参考:https://help.aliyun.com/zh/sls/user-guide/spl-overview
private String query;
// 是否开启请求超时
private boolean requestTimeoutEnabled;
// 请求超时时间
private int requestTimeout;
}
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.46</version>
</dependency>
- LogHubConfig 中 consumerGroupName表一个消费组,consumerGroupName相同的consumer分摊消费logstore中的shard,同一个consumerGroupName中的consumer,通过workerInstance name进行区分。
假设logstore中有shard 0 ~ shard 3 这4个shard。
有3个worker,其consumerGroupName和workerinstance name分别是 :
<consumer_group_name_1 , worker_A>,
<consumer_group_name_1 , worker_B>,
<consumer_group_name_2 , worker_C>
则,这些worker和shard的分配关系是:
<consumer_group_name_1 , worker_A>: shard_0, shard_1
<consumer_group_name_1 , worker_B>: shard_2, shard_3
<consumer_group_name_2 , worker_C>: shard_0, shard_1, shard_2, shard_3 # group name不同的worker相互不影响
- 确保实现的ILogHubProcessor process()接口每次都能顺利执行,并退出,这点很重要。
- ILogHubCheckPointTracker的saveCheckPoint()接口,无论传递的参数是true,还是false,都表示当前处理的数据已经完成,参数为true,则立刻持久化至服务端,false则每隔60秒同步一次到服务端。
- LogHubConfig中配置的是子用户的accessKeyId、accessKey,需要在RAM中进行以下授权,详细内容请参考API文档:
Action | Resource |
---|---|
log:GetCursorOrData | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName} |
log:CreateConsumerGroup | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/* |
log:ListConsumerGroup | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/* |
log:ConsumerGroupUpdateCheckPoint | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:ConsumerGroupHeartBeat | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:GetConsumerGroupCheckPoint | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |