高并发场景下取消订阅时报 ConcurrentModificationException
yinyuncan opened this issue · 3 comments
yinyuncan commented
Please answer some questions before submitting your issue. Thanks!
Which version of Mica do you using(使用的 mica 版本)?
2.1.2
Expected behavior(预期的效果)
取消订阅时不应该报ConcurrentModificationException
Actual behavior(实际的效果)
java.util.ConcurrentModificationException: null
at java.base/java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:756)
at java.base/java.util.LinkedHashMap$LinkedValueIterator.next(LinkedHashMap.java:783)
at net.dreamlu.iot.mqtt.core.client.DefaultMqttClientSession.getMatchedSubscription(DefaultMqttClientSession.java:135)
at net.dreamlu.iot.mqtt.core.client.DefaultMqttClientProcessor.invokeListenerForPublish(DefaultMqttClientProcessor.java:333)
at net.dreamlu.iot.mqtt.core.client.DefaultMqttClientProcessor.processPublish(DefaultMqttClientProcessor.java:214)
at net.dreamlu.iot.mqtt.core.client.MqttClientAioHandler.handler(MqttClientAioHandler.java:82)
at org.tio.core.task.HandlerRunnable.handler(HandlerRunnable.java:257)
at org.tio.core.task.DecodeRunnable.handler(DecodeRunnable.java:260)
at org.tio.core.task.DecodeRunnable.decode(DecodeRunnable.java:369)
at org.tio.core.ReadCompletionHandler.completed(ReadCompletionHandler.java:252)
at org.tio.core.ReadCompletionHandler.completed(ReadCompletionHandler.java:209)
at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:129)
at java.base/sun.nio.ch.Invoker$2.run(Invoker.java:221)
at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:113)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Steps to reproduce the behavior(重现步骤)
// 根据上一个异步操作的结果,进行下一步操作
mono.flatMapMany(result -> {
if (result.isSuccess()) {
FaceQueryResponse faceQueryResponse = (FaceQueryResponse) result.getData();
// 查询所有的人脸数据,从下标0开始
return Flux.range(0, faceQueryResponse.count()).publishOn(Schedulers.boundedElastic()).map(i -> {
// 构建查询人脸详情数据对象
String responseDetailTopic = String.format("face/%s/response", clientId.get());
int finalI = i;
// 人脸详情数据处理
Mono<R> detailMono = Mono.create(monoSink -> {
ScheduledFuture<?> scheduledFuture = executor.schedule(() -> {
monoSink.success(R.fail("设备通讯超时,人脸详情数据同步失败!"));
}, 4, TimeUnit.SECONDS);
log.info("第二层进来了:{}",responseDetailTopic);
mqttTemplate.subQos0(responseDetailTopic, (context2, topic2, message2, payload2) -> {
// 取消定时任务
scheduledFuture.cancel(true);
String responseDetailPayload = ByteBufferUtil.toString(payload2);
log.info("第2层查询返回 response: {}", responseDetailPayload);
FaceQueryResponse faceDetailResponse = JsonUtil.parse(responseDetailPayload, FaceQueryResponse.class);
}
});
});
// 取消详情订阅
detailMono.subscribe((r) -> {
log.info("取消订阅:{}",responseDetailTopic);
// 取消订阅MQTT响应的主题
mqttTemplate.unSubscribe(responseDetailTopic);
});
return detailMono;
});
}
log.info("错误just返回 result: {}", result);
return Mono.just(result);
}).collectList().flatMap(resultList -> {
// 处理所有同步操作的结果
log.info("resultList: {}", resultList);
return Mono.just(resultList);
})
错误出现在 mqttTemplate.unSubscribe(responseDetailTopic); 取消订阅的时候抛出的
Other information(其他信息)
java.util.ConcurrentModificationException 表示在遍历集合的过程中,集合的结构发生了变化,导致遍历过程中抛出异常。这通常是由于在使用迭代器遍历集合的过程中,对集合进行了增加、删除等结构变化的操作。
在遍历集合时出现了 ConcurrentModificationException 异常,可以考虑使用并发安全的集合类,例如 ConcurrentHashMap、CopyOnWriteArrayList 等,或在遍历集合时使用迭代器,并在进行集合操作时使用迭代器的 remove() 方法,而非集合本身的删除操作。
如果使用的是 Java 8 及以上版本,也可以考虑使用流(Stream)来处理集合
yinyuncan commented
大佬您好!我觉得可以这样改一下
public List<MqttClientSubscription> getMatchedSubscription(String topicName) {
return subscriptions.values().stream()
.flatMap(Collection::stream)
.filter(subscription -> subscription.matches(topicName))
.collect(Collectors.toList());
}
ChunMengLu commented
@yinyuncan 收到,我调整下
ChunMengLu commented
@yinyuncan 2.2.0 已经发布,已经修复,不过需要注意这个版本:client、server 发送订阅 payload 类型由 ByteBuffer
改为 byte[]
。