RocketMQ源码学习-Producer启动流程
aCoder2013 opened this issue · 0 comments
aCoder2013 commented
Apache RocketMQ is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability.
前言
最近正好在研究RocketMQ,因此打算写一些相关的博文,这是第一篇关于Producer的博客
Producer的启动流程
一个简单的Demo
/*
* 指定一个全局唯一的Group
*/
DefaultMQProducer producer = new DefaultMQProducer("hello-group");
/*
* 指定name server的地址,可以是多个,分号分隔
*/
producer.setNamesrvAddr("localhost:9876");
/*
* 启动
*/
producer.start();
内部如何工作?
接下来我们就尝试搞清楚上面那坨代码具体都干了哪些东西:
/*
* 构造函数
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
public void start() throws MQClientException {
//将所有的调用都交给这个哥们去做
this.defaultMQProducerImpl.start();
}
可以看到DefaultMQProducer
只是一个门面类,具体的实现都是由DefaultMQProducerImpl
去做的:
//默认的状态,是否可以用volatile修饰?
private ServiceState serviceState = ServiceState.CREATE_JUST;
public void start() throws MQClientException {
this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
/*
* 检查group的名字,不能为空也不能是默认的,因为需要全局唯一
*/
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
//如果实例名为空的话就改成进程的ID
this.defaultMQProducer.changeInstanceNameToPID();
}
//创建`MQClientFactory`实例(保存在一个map中,key的形式类似IP@进程ID)
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
/*
* 放到缓存中,组名作为Key
* ConcurrentMap<String, MQProducerInner> producerTable
*/
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
/*
* 如果组名或者Producer为空的话就会返回false,但这里不会发生;另外内部是用ConcurrentHashMap#putIfAbsent实现的,如果
* 返回的值非空,说明已经创建过,那么这里也会返回false,也就避免了并发启动的问题
*/
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
/*
* 测试用,这里缓存的结构是`ConcurrentMap<String, TopicPublishInfo>`,key是topic,也就是在这里会缓存topic的路由信息,
* 发送消息的时候也就会根据`TopicPublishInfo`的信息决定实际使用哪个queue发送
*/
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
//启动MQClientFactory
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
//标记为运行中
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
//向所有的broker发送心跳(组名)
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
这段代码非常的清晰,其中有一行比较核心的,大部分初始化的工作都是这里完成的mQClientFactory.start();
,因此我们看看这里具体都是啥:
public void start() throws MQClientException {
//用synchronized修饰保证线程安全性与内存可见性
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 如果未指定的话就会通过制定的接口去获取name server的地址,超时时间是3秒
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
/*
* 启动用于通讯的客户端,内部是用Netty实现的
*/
this.mQClientAPIImpl.start();
// 启动所有的定时任务
this.startScheduledTask();
// TODO:目前还不清楚为啥生产者还需要启动一个线程专门用于拉消息
this.pullMessageService.start();
// 启动均衡消息的线程
this.rebalanceService.start();
// 启动它内部的Producer
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
private void startScheduledTask() {
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
/*
* 每两分钟抓取一次,也就是说可以通过这个服务定时摘掉挂了的broker,和心跳检测
* 双重保障
*/
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//定时更新Topic的路由信息
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//健康检查相关的
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
/*
* 持久化消费者当前消费的位移,这里说一下消费者的可能会踩到的坑,
* 对于同一个topic,不同group下的消费者offset是独立的,也就是
* 同一个消息会消费两次
*/
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
/*
* 根据当前的积压调优线程池的核心线程数,不过看了下实现是空的
*/
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}