JackJiang2011/MobileIMSDK

Server端OnlineProcessor中putUser和removeUser存在的并发问题

fuhaodev opened this issue · 3 comments

putUser

源码里实现是先查询再判断,这里应该是有问题的,如果多个终端同时登录同一个账号,可能会存在丢失Channel导致资源泄露的问题。多个线程同时进到putUser,onlineSessions.get(user_id)结果都为空,就会执行到else里onlineSessions.put(user_id, newSession);这样就后一个就会把前一个Channel覆盖掉,导致Channel泄露,前一个Channel资源不能及时得到释放,也就不会kick掉。

  • 修改方案:直接先使用ConcurrentHashMap的put方法添加user_id和Channel的映射,put的结果会返回old值,因为ConcurrentHashMap是线程安全的,而且put始终会返回old值,再判断old值,如果old值不为空,直接走kick流程。

removeUser

这里完全没有必要上锁,直接加synchronized也太暴力了,把所有用户的remove都阻塞了。

  • 修改方案:也是直接利用ConcurrentHashMap先remove,remove结果为true表示移除成功,false表示移除过了,通过remove返回结果来处理其他业务。

贴一段我这块的实现代码吧,可能也存在问题,一起来交流下

 // 绑定用户和Channel的关系
 public void bindSession(Session session, Channel newChannel) {
        final Channel oldChannel = userIdChannelMap.put(session.getUserId(), newChannel);
        // 如果oldChannel不为空,表明操作的是同一个userId,如果是同一个userId表示重复登录了
        // 重复登录有两种情况:
        // 1. 如果是同一个channel,表示用户没有重复建立连接,仅仅是重复发送了登录请求,这种情况忽略即可
        // 2. 如果不是同一个channel,表示用户重复建立连接了,可能是客户端bug,也可能是在另外一个终端登录,也就是被踢的场景,这种情况需要将原来的channel断开。
        if (oldChannel != null) {
            final boolean isTheSame = oldChannel.compareTo(newChannel) == 0;
            // 不是同一个连接,将原来的channel关闭断开或kick,防止资源泄露
            if (!isTheSame) {
                log.warn("duplicate login, close old connection, uid={}, oldId={}, newId={}", session.getUserId(),
                        oldChannel.id().asShortText(), newChannel.id().asShortText());
                oldChannel.attr(ATTRIBUTE_KEY_SESSION).set(null);
                oldChannel.close();
            }
        }
        // 如果oldChannel为空,表示用户第一次登录,则将在线数量加1
        else {
            onlineUserCount.incrementAndGet();
        }
        // 设置channel属性信息,这里不管是第一次登录还是重复登录,都更新一次attribute
        newChannel.attr(ATTRIBUTE_KEY_SESSION).set(session);
    }

 // 解绑用户和Channel的关系
 public void unBindSession(Channel channel) {
        // 检查是否登录过
        if (!hasLogin(channel)) {
            log.warn("unbind session, but not found session, {}", ClientUtils.getClientInfo(channel));
            return;
        }
        // 获取登录信息
        Session session = getSession(channel);
        // 使用ConcurrentHashMap的remove()来代替查询判断再移除的操作,来保证线程安全和原子性
        final boolean ret = userIdChannelMap.remove(session.getUserId(), channel);
        // 如果用户信息从map中移除成功,才进行在线数量的减1操作并移除Channel的attributes
        if (ret) {
            log.info("unbind session, {}", ClientUtils.getSimpleClientInfo(channel));
            onlineUserCount.decrementAndGet();
            channel.attr(ATTRIBUTE_KEY_SESSION).set(null);
        }
}

我回头来仔细读一读你写的内容