BanTanger/im-whale-shark

[feature]: 线程池参数配置解释

Closed this issue · 1 comments

目前使用到线程池的代码如下:

threadPoolExecutor = new ThreadPoolExecutor(8, 8, 60, TimeUnit.SECONDS,
// 任务队列存储超过核心线程数的任务
new LinkedBlockingDeque<>(1000), r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("[P2P] message-process-thread-" + num.getAndIncrement());

threadPoolExecutor.execute(() -> {
// 1. 消息持久化落库(MQ 异步) id 防重处理,通过唯一键,做一个防重处理
messageStoreServiceImpl.storeP2PMessage(messageContent);
// 2. 在异步持久化之后执行离线消息存储
OfflineMessageContent offlineMessage = getOfflineMessage(messageContent);
messageStoreServiceImpl.storeOfflineMessage(offlineMessage);
// 线程池执行消息同步,发送,回应等任务流程
doThreadPoolTask(messageContent);
// 缓存消息
messageStoreServiceImpl.setMessageCacheByMessageId(
messageContent.getAppId(), messageContent.getMessageId(), messageContent);
log.info("消息 ID [{}] 处理完成", messageContent.getMessageId());
});

private void doThreadPoolTask(MessageContent messageContent) {
// 2. 返回应答报文 ACK 给自己
ack(messageContent, ResponseVO.successResponse());
// 3. 发送消息,同步发送方多端设备
syncToSender(messageContent);
// 4. 发送消息给对方所有在线端
List<ClientInfo> clientInfos = dispatchMessage(messageContent);
// 决策前移,因为离线用户无法走消息接收逻辑,也就无法识别命令
// 这里将服务端接收确认迁移于此,保证离线用户也能实现消息可靠性
if (clientInfos.isEmpty()) {
// 如果接收方为空,代表目标用户离线,服务端代发响应 ACK 数据包
receiveAckByServer(messageContent);
}
}

可看出这些任务基本上耗时都在网络 IO 上,是 IO 密集型任务

由于 IO 密集型的 CPU 使用率较低,导致线程空闲时间很多,因此通常需要开 CPU 核心数两倍的线程,当 IO 线程空闲时,可以启动其他线程来继续使用 CPU,以提升 CPU 利用率

Netty 的 IO 处理任务就是典型的 IO 密集型任务,所以 Netty 的 Reactor 反应器实现类的 IO 处理线程数默认是 CPU 核数的两倍

image

这里由于我的 CPU 是四核,所以直接硬编码成 8,应当改进成 Netty Reactor 那种形式 Runtime.getRuntime().availableProcessors() * 2

至于为什么 corePoolSize 与 maximumPoolSize 保持一致,这是因为线程池的执行顺序是:当前线程如果没有达到核心线程数,无论怎样都开辟新线程执行任务,而达到了核心线程而没达到最大线程数,任务先进入阻塞队列等待,当任务数超过阻塞队列后才新建线程,直至达到最大线程数,如果之后还有任务,则会直接被拒绝策略拒绝(默认拒绝策略是抛出异常,当然可以自定义拒绝策略)
而任务加入阻塞队列,等待核心线程空闲,如果队列容量很大且核心线程繁忙,就会导致任务长期得不到执行
corePoolSize == maximumPoolSize,在接受新任务时,如果没有空闲工作线程,就优先创建新线程去执行任务,而不是先将任务加入阻塞队列等待现有工作线程空闲再执行

close