目前使用到线程池的代码如下:
|
threadPoolExecutor = new ThreadPoolExecutor(8, 8, 60, TimeUnit.SECONDS, |
|
// 任务队列存储超过核心线程数的任务 |
|
new LinkedBlockingDeque<>(1000), r -> { |
|
Thread thread = new Thread(r); |
|
thread.setDaemon(true); |
|
thread.setName("[P2P] message-process-thread-" + num.getAndIncrement()); |
|
threadPoolExecutor.execute(() -> { |
|
// 1. 消息持久化落库(MQ 异步) id 防重处理,通过唯一键,做一个防重处理 |
|
messageStoreServiceImpl.storeP2PMessage(messageContent); |
|
// 2. 在异步持久化之后执行离线消息存储 |
|
OfflineMessageContent offlineMessage = getOfflineMessage(messageContent); |
|
messageStoreServiceImpl.storeOfflineMessage(offlineMessage); |
|
// 线程池执行消息同步,发送,回应等任务流程 |
|
doThreadPoolTask(messageContent); |
|
// 缓存消息 |
|
messageStoreServiceImpl.setMessageCacheByMessageId( |
|
messageContent.getAppId(), messageContent.getMessageId(), messageContent); |
|
|
|
log.info("消息 ID [{}] 处理完成", messageContent.getMessageId()); |
|
}); |
|
private void doThreadPoolTask(MessageContent messageContent) { |
|
// 2. 返回应答报文 ACK 给自己 |
|
ack(messageContent, ResponseVO.successResponse()); |
|
// 3. 发送消息,同步发送方多端设备 |
|
syncToSender(messageContent); |
|
// 4. 发送消息给对方所有在线端 |
|
List<ClientInfo> clientInfos = dispatchMessage(messageContent); |
|
// 决策前移,因为离线用户无法走消息接收逻辑,也就无法识别命令 |
|
// 这里将服务端接收确认迁移于此,保证离线用户也能实现消息可靠性 |
|
if (clientInfos.isEmpty()) { |
|
// 如果接收方为空,代表目标用户离线,服务端代发响应 ACK 数据包 |
|
receiveAckByServer(messageContent); |
|
} |
|
} |
可看出这些任务基本上耗时都在网络 IO 上,是 IO 密集型任务
由于 IO 密集型的 CPU 使用率较低,导致线程空闲时间很多,因此通常需要开 CPU 核心数两倍的线程,当 IO 线程空闲时,可以启动其他线程来继续使用 CPU,以提升 CPU 利用率
Netty 的 IO 处理任务就是典型的 IO 密集型任务,所以 Netty 的 Reactor 反应器实现类的 IO 处理线程数默认是 CPU 核数的两倍
这里由于我的 CPU 是四核,所以直接硬编码成 8,应当改进成 Netty Reactor 那种形式 Runtime.getRuntime().availableProcessors() * 2
至于为什么 corePoolSize 与 maximumPoolSize 保持一致,这是因为线程池的执行顺序是:当前线程如果没有达到核心线程数,无论怎样都开辟新线程执行任务,而达到了核心线程而没达到最大线程数,任务先进入阻塞队列等待,当任务数超过阻塞队列后才新建线程,直至达到最大线程数,如果之后还有任务,则会直接被拒绝策略拒绝(默认拒绝策略是抛出异常,当然可以自定义拒绝策略)
而任务加入阻塞队列,等待核心线程空闲,如果队列容量很大且核心线程繁忙,就会导致任务长期得不到执行
corePoolSize == maximumPoolSize,在接受新任务时,如果没有空闲工作线程,就优先创建新线程去执行任务,而不是先将任务加入阻塞队列等待现有工作线程空闲再执行