lets-mica/mica-mqtt

高并发场景下取消订阅时报 ConcurrentModificationException

yinyuncan opened this issue · 3 comments

Please answer some questions before submitting your issue. Thanks!

Which version of Mica do you using(使用的 mica 版本)?

2.1.2

Expected behavior(预期的效果)

取消订阅时不应该报ConcurrentModificationException

Actual behavior(实际的效果)

java.util.ConcurrentModificationException: null
	at java.base/java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:756)
	at java.base/java.util.LinkedHashMap$LinkedValueIterator.next(LinkedHashMap.java:783)
	at net.dreamlu.iot.mqtt.core.client.DefaultMqttClientSession.getMatchedSubscription(DefaultMqttClientSession.java:135)
	at net.dreamlu.iot.mqtt.core.client.DefaultMqttClientProcessor.invokeListenerForPublish(DefaultMqttClientProcessor.java:333)
	at net.dreamlu.iot.mqtt.core.client.DefaultMqttClientProcessor.processPublish(DefaultMqttClientProcessor.java:214)
	at net.dreamlu.iot.mqtt.core.client.MqttClientAioHandler.handler(MqttClientAioHandler.java:82)
	at org.tio.core.task.HandlerRunnable.handler(HandlerRunnable.java:257)
	at org.tio.core.task.DecodeRunnable.handler(DecodeRunnable.java:260)
	at org.tio.core.task.DecodeRunnable.decode(DecodeRunnable.java:369)
	at org.tio.core.ReadCompletionHandler.completed(ReadCompletionHandler.java:252)
	at org.tio.core.ReadCompletionHandler.completed(ReadCompletionHandler.java:209)
	at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:129)
	at java.base/sun.nio.ch.Invoker$2.run(Invoker.java:221)
	at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:113)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

image

Steps to reproduce the behavior(重现步骤)

//		根据上一个异步操作的结果,进行下一步操作
		mono.flatMapMany(result -> {
			if (result.isSuccess()) {
				FaceQueryResponse faceQueryResponse = (FaceQueryResponse) result.getData();
//				查询所有的人脸数据,从下标0开始
				return Flux.range(0, faceQueryResponse.count()).publishOn(Schedulers.boundedElastic()).map(i -> {
//					构建查询人脸详情数据对象
			
					String responseDetailTopic = String.format("face/%s/response", clientId.get());
					int finalI = i;
//					人脸详情数据处理
					Mono<R> detailMono = Mono.create(monoSink -> {
						ScheduledFuture<?> scheduledFuture = executor.schedule(() -> {
							monoSink.success(R.fail("设备通讯超时,人脸详情数据同步失败!"));
						}, 4, TimeUnit.SECONDS);
						log.info("第二层进来了:{}",responseDetailTopic);
						mqttTemplate.subQos0(responseDetailTopic, (context2, topic2, message2, payload2) -> {
							// 取消定时任务
							scheduledFuture.cancel(true);
							String responseDetailPayload = ByteBufferUtil.toString(payload2);
							log.info("第2层查询返回 response: {}", responseDetailPayload);
							FaceQueryResponse faceDetailResponse = JsonUtil.parse(responseDetailPayload, FaceQueryResponse.class);

							}
						});
					});

//					取消详情订阅
					detailMono.subscribe((r) -> {
						log.info("取消订阅:{}",responseDetailTopic);
						// 取消订阅MQTT响应的主题
						mqttTemplate.unSubscribe(responseDetailTopic);
					});
					return detailMono;
				});
			}
			log.info("错误just返回 result: {}", result);
			return Mono.just(result);
		}).collectList().flatMap(resultList -> {
			// 处理所有同步操作的结果
			log.info("resultList: {}", resultList);
			return Mono.just(resultList);

		})

错误出现在 mqttTemplate.unSubscribe(responseDetailTopic); 取消订阅的时候抛出的

Other information(其他信息)

java.util.ConcurrentModificationException 表示在遍历集合的过程中,集合的结构发生了变化,导致遍历过程中抛出异常。这通常是由于在使用迭代器遍历集合的过程中,对集合进行了增加、删除等结构变化的操作。

在遍历集合时出现了 ConcurrentModificationException 异常,可以考虑使用并发安全的集合类,例如 ConcurrentHashMap、CopyOnWriteArrayList 等,或在遍历集合时使用迭代器,并在进行集合操作时使用迭代器的 remove() 方法,而非集合本身的删除操作。

如果使用的是 Java 8 及以上版本,也可以考虑使用流(Stream)来处理集合

大佬您好!我觉得可以这样改一下

public List<MqttClientSubscription> getMatchedSubscription(String topicName) {
    return subscriptions.values().stream()
            .flatMap(Collection::stream)
            .filter(subscription -> subscription.matches(topicName))
            .collect(Collectors.toList());
}

@yinyuncan 收到,我调整下

@yinyuncan 2.2.0 已经发布,已经修复,不过需要注意这个版本:client、server 发送订阅 payload 类型由 ByteBuffer 改为 byte[]