/fan-push

a simple and small push system for Android and server based on Netty.

Primary LanguageJava

fan-push

这是使用Netty框架实现的一个小型的推送系统.

主要适用的场景或者解决的问题

  • 1,针对于在 局域网/内网 的简单推送业务.

  • 2,在项目中, 我们经常会遇到这样一个场景: 用户 A 和 B 都处于详情页面, A 对页面进行了修改, 但是 B 无法知道, 需要服务器告知 B 刷新页面. 举个栗子: 我们在吃饭时候经常用的二维码点单页面, A 与 B 去吃饭点单, 当 A 点了一个菜, 就要告诉一下B, 在用户 B 的页面上好像弹出一个小小的气泡, 说: A 刚刚点了一个菜 这种业务场景是不用保活的. 因为都是在应用处于前台页面的时候进行推送. 保持长连接就好了.

我们主要针对业务2来实现.

主要功能

  • Netty 客户端与服务端代码编写

    参考 PushClient 和 PushServer

  • TCP拆包与粘包

    参考 LengthFieldPrepender 和 LengthFieldBasedFrameDecoder

  • 长连接握手认证

    参考PushClient 中连接成功 / PushServerHandler 中对握手消息的处理 / PushClientHandler 对握手成功和握手失败的处理

  • 心跳机制(ping, pong)

    参考 PushClientHandler 中的 ping 消息 / PushServerHandler 中的pong消息

  • 客户端断线重连机制

    参考 PushClient 中 startTimerToReconnect / ConnectionWatchdog

  • 消息重发机制

    参考 MessageRetryManager 和 MessageLooper

  • 离线消息功能

    参考MessageRetryManager

整体功能流程

  • 0, 服务器运行

  • 1, 最开始客户端连接后, 客户端需要给服务端发送一个握手消息(1001)

  • 2, 服务端收到客户端的握手消息后, 验证

    a, 如果验证成功, 返回给客户端握手成功的消息
    b,如果验证失败, 返回给客户端握手失败的消息

  • 3, 客户端端收到握手消息

    a, 握手消息(1001) status=-1, 就知道自己握手都没成功, 直接关闭连接
    b, 握手消息(1001) status=1, 就知道自己握手成功, 开始后续的逻辑:心跳

  • 4, 客户端发送的心跳消息(ping)

  • 5, 服务端接收到心跳消息(ping)后, 返回心跳响应消息(pong)

  • 下面进行正常业务推送

  • 6, 服务端给客户端推送消息(1004)

  • 7, 如果客户端收到服务器的推送消息, 发送收到回执.

断线重连

  1. 第一次客户端连接, 考虑是网络等问题连接不上, 需要尝试进行重连;
  2. 如果客户端握手失败, 则不需要重连. (用户名密码都失败了, 还重连啥呀)
  3. 客户端与服务端的长连接断开, 会进入客户端的 channelInactive 方法中. 在 channelInactive 方法中需要尝试进行重连 正常断开包括因此在 中需要重连. 是服务器正常给客户端关掉了, 或者客户端自己关了网络, 开了飞行, 这些会自动进入channelInactive

有如下情况会进入客户端的 channelInactive 方法:

  • 客户端关闭连接, 比如客户端调用 channel.close() 或者 channelContext.close()
  • 客户端异常崩溃
  • 服务端主动将与客户端的长连接关闭了
  • 手动将服务端程序异常关闭
  • 手机断开WIFI, 会自动进入 channelInactive
  • 手机开飞行模式, 会自动进入 channelInactive
  1. 客户端的 读超时(很长时间没收到pong消息), 需要重连

有如下情况, 造成读超时

  • 客户端和服务器都正常, 但是网络中间节点故障造成网络根本不通. 在同一台电脑测试的时候, 有线网络, 我尝试将 Mac 电脑的网络服务停用, 就出现了这个场景
  • 服务端的网线被人拔掉了, 也相当于网络问题
  1. 重连时长策略

            // 重新连接
            if (attempts < MAX_ATTEMPTS) {
                attempts++;
            }
            long t = 2 << attempts;
            
    

    我们主要使用左移的方式, 来验证每一次尝试请求的间隔时间.

    // int a = 2<<12;   //  4   8  16  32  64  128  256 512 1024 2048 4096  8192
    

    第一次重连, 间隔4秒

    第二次重连, 间隔8秒

    第三次重连, 间隔16秒

    ... ...

    第8次, 间隔512秒, 不到9分钟

    如果多次重连总是失败, 可认为是出现了一些状况. 比如客户端进电梯了? 亦或是服务端宕机了等原因 导致无法连接成功

    因此就延长间隔时间, 每一次的间隔时间呈指数级增长

  2. Q: 为啥 又要监听 读超时, 又要在 WatchDog 的 ChannelInActive 中重连?

    因为, 如果是服务器把客户端的连接关闭了, 只进入 WatchDog 的 ChannelInActive, 不会再进入读超时

    可以自行测试, 如果服务器把客户端的连接关闭了, 依次回调方法: channelInactive channelUnregistered handlerRemoved.

    这样Handler已经被移除了, 就不会收到读超时的事件了

    如果是服务器与客户端之间网络的中间节点问题, 客户端不会进入 inActive, 只能通过这里的 读超时来判断

    注意: 手机端断开WIFI,或者开飞行模式, 都是会触发 ChannelInActive的

消息重发管理器

  1. 思路:

    客户端和服务器建立长连接之后,需要发送业务信息。由于网络等各种原因,连接不能保证畅通,发送的消息不一定能到对方。

    服务器把消息发送出去,服务器记录这条消息的状态: 客户端如果收到这条消息,向服务器发送一个已收到的回执,服务器收到这个回执后将状态修改成已经收到; 如果一定时间没有收到回执,则再次发送这条消息。

    服务端维护一个正在发消息的队列,每次发送消息的同时, 把消息往正在发送的消息队列中存一份. 当收到客户端的回执,再把相应的消息从队列中移除。 服务器每10s会轮询这个消息队列,把消息队列中已经发送的消息但是没有收到回执的消息再次发送一遍。

    如果同一条消息被发送了5次,一直没有收到回执,则认为服务器与客户端保持的这个长连接已经断开了,这种情况服务器则把这个长连接对象channel关闭、释放掉资源。

    如果客户端与服务器保持的长连接对象channel关闭掉,则需要处理“正在发消息的队列”中的消息,将属于这个channel的消息持久化到数据库中。 当下次这个客户端用户再次上线, 也就是再次建立客户端的长连接的时候, 我们首先在数据库中查询时候有属于它的未发送消息,全部进行发送。

    另一个问题: 服务器向客户端发送消息1次,客户端向服务器发送这条消息的回执。 服务器由于网络原因没有收到回执,这条消息的回执丢了,服务器会再把这条消息发送第二次,客户端这次会收到重复的消息,这时候客户端怎么处理呢??继续显示这条消息显然不正确,客户端需要验证这条消息是否收到! 进行合法性进行验证。这里需要用到消息唯一标示(messageId)。

    具体客户端如何进行消息重复性验证, 参考 PushClientHandler

消息轮询器

  1. 为每一个用户(userId)分配一个轮询器
  2. 当服务端给客户端发送消息的时候, 要把消息添加到超时管理器中
  3. 在 MessageRetryManager 的 add 方法中, 如果是个新的用户, 就给他分配一个轮询器, 轮询器每10秒去检查一下这个用户有没有消息需要重发
  4. 如果没有, 就算了. 等10秒再来看看. 如果有, 就开启定时器开始重发(是把所有没接收成功的都再发一遍).
  5. 如果客户端收到了, 就返回给服务器接收回执, 服务器收到接收回执, 就把消息从 超时管理器中 移除
  6. 如果发了5次, 客户端依然没有收到, 就认为客户端已经断线了. 此时, 将与客户端的连接断开, 并且, 将没发成功的消息写入数据库, 作为离线消息.
  7. 等下次客户端上线了, 统一把所有的离线消息发给客户端

客户端消息唯一性判断

考虑以下情况会出现消息重复:

服务器向客户端发送消息1次,客户端向服务器发送这条消息的回执。 服务器由于网络原因没有收到回执,这条消息的回执丢了,服务器会再把这条消息发送第二次,客户端这次会收到重复的消息,这时候客户端怎么处理呢??

继续显示这条消息显然不正确,客户端需要验证这条消息是否收到,进行合法性进行验证。这里需要用到消息唯一标示(messageId)。

我们使用一个大的 队列, 里面专门存放已经收到的消息的 id 收到消息后, 如果queue 里没有, 就入队, 正常处理 如果queue 里有了, 就代表是重复的消息了, 此时, 只要告诉服务器我们收到了就行了, 然后把消息丢弃, 不做处理 为啥还要告诉服务器呢? 因为你不给服务器发已收到回执, 这条消息就一直存在服务器的超时管理器里, 会一直给客户端重发

但是这个方案有个问题:

比如, 此时, 服务器给客户端发了一个消息A, 客户端收到了. 但是客户端发送的回执服务器没收到. 客户端就下线了. 那, 这条消息A就作为离线消息存起来了. 下次客户端再上线, 服务器会把离线消息A再次发来. 此时, 客户端上线, PushClientHandler 对象是新建的, 那么 oldMessageQueue 集合就是空的了. 导致的问题就是: 客户端会认为A 并不是一条重复的消息, 会显示出来. 其实, 消息A在客户端上次下线之前就已经处理过了 那怎么做呢? 正确的做法应该是要把收到的消息持久化吧? 我们客户端自己弄个数据库, 里面用于存历史收到的消息的messageId. 这样就可以了. 同时要考虑到, 这个数据库的记录数量不能无限增长. 因为一直收消息一直收, 就爆炸了 可以设置数据库记录最大10万条. 多了之后, 就把最老的记录覆盖, 这样是不是就可以了.

离线消息

当客户端用户下线的时候, 要看 消息重发管理器中是否有需要重发的消息

如果有, 则要存储在数据库中, 作为离线消息. 等这个用户下次上线的时候, 再重新发给他.