/fastim

🚀 基于Netty高可用分布式即时通讯系统,支持长连接网关管理、单聊、群聊、登录登出、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步和漫游等功能,支持集群部署的分布式架构。

Primary LanguageJava

FastIM

🚀基于Netty高可用分布式即时通讯系统,支持长连接网关管理、单聊、群聊、登录登出、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步和漫游等功能,支持集群部署的分布式架构。

一、项目结构

  • fastim-logic:逻辑服务,比如单聊、群聊、红包、离线等业务逻辑
  • fastim-client:客户端逻辑实现,实现发送消息、断线重连、SDK包等功能
  • fastim-gate-tcp:HTTP API网关实现,实现限流降级、版本路由、openAPI管理、协议转换、泛化调用等功能
  • fastim-gate-http:长连接TCP网关实现,实现自定义协议、channel管理、心跳检测、泛化调用等功能
  • fastim-leaf:分布式ID实现,基于zookeeper的实现或基于redis实现或完全基于内存的实现
  • fastim-router:路由逻辑服务,主要负责消息的路由转发,以及客户端在线状态的维护
  • fastim-common:共用类,主要是实体类和工具类的存放
  • fastim-lsb:LSB service,提供接入层IP和port来进行负载均衡的连接
  • fastim-das:数据访问层实现,封装一层,提供数据的增删改查入口

二、功能

  • IM架构设计
  • 分布式ID设计
  • IM表结构设计
  • TCP网关设计实现
  • HTTP网关设计实现
  • 自定义通讯协议 + Protobuf序列化
  • 通讯协议支持HTTP/2
  • 单聊、群聊
  • 心跳保活
  • 聊天记录存储和查询
  • 客户端断线自动重连
  • 登录、注册、登出实现
  • 消息推送、数据上报
  • 提供客户端SDK
  • 多端同步消息和消息漫游
  • 协议支持消息加密
  • 唯一设备踢出
  • 文件发送,图片发送
  • 协议支持消息加密
  • 群红包
  • 消息撤回
  • 已读未读

三、快速启动

四、系统设计

1. IM架构图

基于可扩展性高可用原则,把网络层、业务逻辑层、数据层分离,并且支持分布式部署 IM架构图

2. 架构设计

2.0 CLIENT设计:

  1. client每个设备会在本地存每一个会话,保留有最新一条消息的顺序 ID
  2. 为了避免client宕机,也就是退出应用,保存在内存的消息ID丢失,会存到本地的文件中
  3. client需要在本地维护一个等待ack队列,并配合timer超时机制,来记录哪些消息没有收到ack:N,以定时重发。
  4. 客户端本地生成一个递增序列号发送给服务器,用作保证发送顺序性。该序列号还用作ack队列收消息时候的移除。
2.0.0 客户端序列号设计

客户端序列号

基于以下考虑:

  • 发送消息速率不是非常快,可以设计成每秒并发4个消息
  • 数据传输中的大小尽量小用int,不用long
  • 只保证递增即可 说明:上述生成器可以用18年[(2^29-1)/3600/24/365]左右,并且每秒并发4个消息

2.1 LSB设计:

  1. 接入层的高可用、负载均衡、扩展性全部在这里面做
  2. 客户端通过LSB,来获取gate IP地址,通过IP直连,目的是
    • 灵活的负载均衡策略 可根据最少连接数来分配IP
    • 做灰度策略来分配IP
    • AppId业务隔离策略 不同业务连接不同的gate,防止相互影响

2.2 GATE设计:

GATE层网关有以下特性:

  1. 任何一个gate网关断掉,用户端检测到以后重新连接LSB服务获取另一个gate网关IP,拿到IP重新进行长连接通信。对整体服务可靠性基本没有影响。
  2. gate可以无状态的横向部署,来扩展接入层的接入能力
  3. 从性能的角度出发,gate层需要存储本地session,并且系统在变更session时候保证了本地session和分布式session的一致性

并且根据协议分类将入口请求打到不同的网关上去,HTTP网关接收HTTP请求,TCP网关接收tcp长连接请求

2.3 ROUTER设计:

  1. 把用户状态信息存储在Redis集群里。因此也是无状态的,任何一个router服务挂掉,不影响整体服务能力。
  2. 转发消息,将消息投递给固定gate上的会话,处理路由逻辑
  3. router层需要存储的关系
    • uid和gate层机器ID关系
    • 用户全局在线信息

2.4 LOGIC设计:

logic按照分布式微服务的拆分**进行拆分,拆分为多个模块

  1. 单聊服务
  2. 群聊服务
  3. 登录 登出 注册服务
  4. 红包服务
  5. 分布式ID服务
  6. 离线服务

2.5 DAS (data access service)设计:

  1. 定时将冷数据迁移到Cold存储系统
  2. 多个存储系统的实现,统一cache层
  3. 向上游屏蔽存储引擎,提供友好的接口

3. 协议设计

3.0 目标

  1. 高性能:协议设计紧凑,保证数据包小,并且序列化性能好
  2. 可扩展性:针对后续业务发展,可以自由的自定义协议,无需较大改动协议结构

3.1 设计实践

IM协议采用二进制定长包头和变长包体来实现客户端和服务端的通信,并且采用谷歌protobuf序列化协议,设计如下:

IM协议设计图

各个字段如下解释:

  • headData:头部标识,协议头标识,用作粘包半包处理。4个字节
  • version:客户端版本。4个字节
  • cmd:业务命令,比如心跳、推送、单聊、群聊。1个字节
  • msgType:消息通知类型 request response notify。1个字节
  • logId:调试性日志,追溯一个请求的全路径。4个字节
  • sequenceId:序列号,可以用作异步处理或者顺序性处理。4个字节
  • contentType:消息的内容类型 文本 图片链接 文件链接。1个字节
  • dataLength:数据体的长度。4个字节
  • data:数据

3.2 注意项

这里要注意的是,针对数据data,网关gate层不做反序列化,反序列化步骤在service做,避免重复序列化和反序列化导致的性能损失

4. Session设计和管理

5. 安全管理

安全层协议设计 基于动态密钥,借鉴类似SSL,不需要用证书来管理,原理如下

  1. 客户端请求服务端生成安全通道
  2. 服务端生成非对称加密的公私钥(公钥A1),将A1传给客户端
  3. 客户端本地生成公私钥A2, 利用A1加密A2生成密文传给服务端
  4. 服务端利用A1解密密文,拿到A2公钥
  5. 服务端随机生成对称加密密钥,传给客户端
  6. 后续的内容传输都用对称加密进行通信 注:对称加密传输内容,非对称加密传输密钥

6. 消息管理

一个正常的消息流转需要如图所示的流程: IM核心流程图

  1. 客户端A发送请求包R
  2. server将消息存储到DB
  3. 存储成功后返回确认ack
  4. server push消息给客户端B
  5. 客户端B收到消息后返回确认ack
  6. server收到ack后更新消息的状态或者删除消息

上述只是正常的流程,一个健壮的系统需要考虑各种异常情况,如丢消息,重复消息,消息时序问题

6.0 消息可靠性、重复性和顺序性保证

6.0.0 消息可靠性如何保证 不丢消息
  1. 应用层ACK
  2. 客户端需要超时与重传
  3. 服务端需要超时与重传 具体做法就是增加ack队列和定时器Timer
6.0.1 消息重复性如何保证 不重复
  1. 超时与重传机制将导致接收的client收到重复的消息,具体做法就是一份消息使用同一个消息ID进行去重处理。
6.0.2 消息顺序性如何保证 不乱序

消息乱序影响的因素:

  1. 时钟不一致,分布式环境下每个机器的时间可能是不一致的
  2. 多发送方和多接收方,这种情况下,无法保先发的消息被先收到
  3. 网络传输和多线程,网络传输不稳定的话可能导致包在数据传输过程中有的慢有的快。多线程也可能是会导致时序不一致影响的因素

以上,如果保持绝对的实现,那么只能是一个发送方,一个接收方,一个线程阻塞式通讯来实现。那么性能会降低。

如何保证时序?

  1. 单聊:通过发送方的绝对时序seq,来作为接收方的展现时序seq。
  2. 群聊:因为发送方多点发送时序不一致,所以通过服务器的单点做序列化,也就是通过ID递增发号器服务来生成seq,接收方通过seq来进行展现时序。
  3. 群聊时序的优化:按照上面的群聊处理,ID递增发号器服务可能会有性能瓶颈。按照道理只需要保证单个群的时序,不需要保证所有群的时序,所以解决思路就是同一个群的消息落到同一个service上面,消息seq通过service本地生成即可,来代替中心化ID发号器的服务。

6.1 如何进行消息的推送和拉取

本项目是进行推拉结合来进行服务器端消息的推送和客户端的拉取,我们知道单pull和单push有以下缺点: 单pull:

  • pull要考虑到消息的实时性,不知道消息何时送达
  • pull要考虑到哪些好友和群收到了消息,要循环每个群和好友拿到消息列表,读扩散

单push:

  • push实时性高,只要将消息推送给接收者就ok,写扩散,但是会集中消耗服务器资源

推拉结合:

  • 推拉结合的方式能够分摊服务端的压力
  • 具体做法就是有新消息时候,推送哪个好友或者哪个群有新消息,以及新消息的数量,客户端按需或者延迟进行拉取

7. 高并发支持

  • 水平扩展:各个模块无状态部署
  • 线程模型:每个服务底层线程模型遵从Netty主从reactor模型
  • 多层缓存:Gate层二级缓存,Redis一级缓存
  • 长连接:客户端长连接保持,避免频繁创建连接消耗

8. 网关设计

8.0 接入层网关和应用层网关不同地方

  1. 接入层网关需要有接收通知包或者下行接收数据的端口,并且需要另外开启线程池。应用层网关不需要开端口,并且不需要开启线程池。
  2. 接入层网关需要保持长连接,接入层网关需要本地缓存channel映射关系。应用层网关无状态不需要保存。

8.1 接入层网关设计

8.1.0 目标:
  1. 网关的线程池实现1+8+4+1,减少线程切换
  2. 集中实现长连接管理和推送能力
  3. 与业务服务器解耦,集群部署缩容扩容以及重启升级不相互影响
  4. 长连接的监控与报警能力
  5. 客户端重连指令一键实现
8.1.1 技术点:
  • 自定义协议以及序列化
  • 通道连接自定义保活以及心跳检测
  • 本地缓存channel
  • 责任链
  • 服务调用完全异步
  • 泛化调用
  • 转发通知包或者Push包
  • 容错网关down机处理
8.1.1 设计方案:

参考基于Netty的长连接网关设计与实现

一个Notify包的数据经网关的线程模型图: TCP网关线程模型

8.2 应用层API网关设计

8.2.0 目标:
  1. 基于版本的自动发现以及灰度/扩容 ,不需要关注IP
  2. 网关的线程池实现1+8+1,减少线程切换
  3. 支持协议转换实现多个协议转换,基于SPI来实现
  4. 与业务服务器解耦,集群部署缩容扩容以及重启升级不相互影响
  5. 接口错误信息统计和RT时间的监控和报警能力
  6. UI界面实现路由算法,服务接口版本管理,灰度策略管理以及接口和服务信息展示能力
  7. 基于OpenAPI提供接口级别的自动生成文档的功能
8.2.1 技术点:
  • 编解码
  • Head 请求
  • Netty http 服务端
  • 责任链
  • 服务调用完全异步
  • 泛化调用
8.2.2 设计方案:

参考基于Netty的API网关设计与实现

一个请求包的数据经网关的架构图: 网关的架构图

9. 业务优化

9.0 性能优化

  1. 对消息的查询处理:
  • 短期消息(7天):存储在Redis里;
  • 近期消息(1-3个月):存储在Mysql里,以备用户实时查询;
  • 历史信息:存储在HBase里,作为历史数据慢查询
  1. 对消息的写入处理:
  • 对并发不高的场景直接rpc调用
  • 对高并发场景可以引入Kafka
  • 写入量巨大时,水平切分能够扩容,MQ缓冲可以保护数据库

9.1 LSB优化

问题背景:当某个实例重启后,该实例的连接断开后,客户端会发起重连,重连就大概率转移其他实例上,导致最近启动的实例连接数较少,最早启动的实例连接数较多

解决方法:

  1. 客户端会发起重连,跟服务器申请重连的新的服务器IP,系统提供合适的算法来平摊gate层的压力,防止雪崩效应。

  2. gate层定时上报本机的元数据信息以及连接数信息,提供给LSB中心,LSB根据最少连接数负载均衡实现,来计算一个节点供连接。

9.2 群组优化

群组优化

  1. 批量消息处理
  2. 批量ACK:每条群消息都ACK,会给服务器造成巨大的冲击,为了减少ACK请求量,批量ACK。
  3. 群消息和成员批量加载以及懒加载:在真正进入一个群时才实时拉取群友的数据
  4. 群离线消息过多:分页拉取(按需拉取)

9.3 Gate缓存优化

gate一级缓存,redis二级缓存.如何维护缓存一致性?可以参考MQ的广播消息

9.4 消息的存储以及扩容

9.5 消息ID生成机制

缺点:

  1. 时间靠的近的消息,区分不出来消息的先后顺序
  2. 当并发度不高的时候,生成的ID后面的最后一位是0,导致数据倾斜

解决方案:

  1. 去掉snowflake最后8位,然后对剩余的位进行取模

9.6 存储库的选型

  1. 前期消息不多的场景可以使用MySQL进行分库分表,当系统瓶颈在数据库的时候,就要进行额外的选型了
  2. 分析IM消息特征
  • 根据消息ID查某一条消息,要么根据一个消息要检索这个范围之间的消息,不需要强的事务型数据库
  • 当数据量大的时候,如果对关系型数据库比如MySQL进行分库分表,运维成本加上开发成本会非常高
  1. 可考虑NoSQL 方案,如
  • K-V存储 Redis
  • 文档数据库 MongoDB
  • 列式数据库 HBase
  • 全文搜索引擎 ElasticSearch

10. 心跳设计

  1. 服务端检测到某个客户端迟迟没有心跳过来可以主动关闭通道,让它下线;
  2. 客户端检测到某个服务端迟迟没有响应心跳也能重连获取一个新的连接。

11. 异常场景处理

11.0 gate层重启升级处理

gate层重启升级或者意外down机有以下问题:

  1. 客户端和gate意外丢失长连接,导致 客户端在发送消息的时候导致消息超时等待以及客户端重试等无意义操作
  2. 发送给客户端的消息,从router层转发给gate的消息丢失,导致消息超时等待以及重试。

解决方案如下:

  1. 重启升级时候,向客户端发送重新连接指令,让客户端重新请求LSB获取IP直连。
  2. 当gate层down机异常停止时候,增加hook钩子,向客户端发送重新连接指令。
  3. 额外增加hook,向router层发送请求清空路由消息和在线状态

12. 核心表结构设计

核心设计点

  1. 群消息只存储一份,用户不需要为每个消息单独存一份。用户也无需去删除群消息。
  2. 对于在线的用户,收到群消息后,修改这个last_ack_msg_id。
  3. 对于离线用户,用户上线后,对比最新的消息ID和last_ack_msg_id,来进行拉取(参考Kafka的消费者模型)
  4. 对应单聊,需要记录消息的送达状态,以便在异常情况下来做重试处理

用户表 t_user

字段 类型 描述
id int 自增ID
user_id long 用户ID
is_deleted tinyint 是否删除
create_time datetime 创建时间
update_time datetime 更新时间

群表 t_group

字段 类型 描述
id int 自增ID
group_id int 群ID
creator long 创建人
is_deleted tinyint 是否删除
create_time datetime 创建时间
update_time datetime 更新时间

群用户表 t_group_user

字段 类型 描述
id int 自增ID
group_id int 群ID
user_id long 用户ID
join_time datetime 加群时间
last_ack_msg_id long 最后一次ack的消息ID
user_device_type tinyint 用户设备类型
is_deleted tinyint 是否删除
create_time datetime 创建时间
update_time datetime 更新时间

群消息表 t_group_msg

字段 类型 描述
id int 自增ID
group_id int 群ID
group_msg_id long 群消息ID
sender_id long 发送方ID
msg_type int 消息类型
msg_content varchar 消息内容
is_deleted tinyint 是否删除
create_time datetime 创建时间
update_time datetime 更新时间

个人消息表 t_user_msg

字段 类型 描述
id int 自增ID
sender_id long 发送方ID
receiver_id long 接收方ID
receiver_device_type tinyint 接收方设备类型
msg_type int 消息类型
msg_content varchar 消息内容
msg_status tinyint 消息状态,已送达和未送达
is_deleted tinyint 是否删除
create_time datetime 创建时间
update_time datetime 更新时间

13 核心业务流程

13.0 用户A发消息给用户B

  • A通过账号密码登录获取token,以及接入层IP和port信息
  • 接入层IP和port信息进行远程TCP连接,接入层维护登录状态
  • 服务层校验token,token校验通过
  • A打包数据发送给服务端, 服务端检测A用户是否风险用户
  • 服务端对消息进行敏感词检查,以及发送频率的限制检查
  • 服务端生成递增的消息ID,将发送的信息和ID打包一块入库,入库成功后返回ACK
  • 服务端检测接收用户B是否在线,在线直接推送给用户B
  • 如果没有本地消息ID则存入,并且返回接入层ACK信息;如果有则拿本地消息ID和推送过来的消息ID大小对比,小获等于则丢弃,大则接收。最后返回接入层ack
  • 服务端接收ACK后,将消息标为已送达
  • 如果用户B不在线,直接通过手机通知来告知客户新消息到来
  • 用户B上线后,拿本地的最大的消息ID,去服务端去分页HTTP拉取离线消息

13.1 用户A发消息给群G

  • 登录,TCP连接,token校验,名词检查,群消息入库成功返回ACK

五、性能测试

六、Q&A

1 架构

  1. Q:对于单聊和群聊的实时性消息,是否需要MQ来作为通信的中间件来代替rpc?

    A:MQ作为解耦可以有以下好处:

    • 易扩展,gate层到logic层无需路由,logic层多个有新的业务时候,只需要监听新的topic即可
    • 解耦,gate层到logic层解耦,不会有依赖关系
    • 节省端口资源,gate层无需再开启新的端口接收logic的请求,而且直接监听MQ消息即可

    但是缺点也有:

    • 网络通信多一次网络通信,增加RT的时间,消息实时性对于IM即使通信的场景是非常注重的一个点
    • MQ的稳定性,不管任何系统只要引入中间件都会有稳定性问题,需要考虑MQ不可用或者丢失数据的情况
    • 需要考虑到运维的成本

    综上,是否考虑使用MQ需要架构师去考量,比如考虑业务是否允许、或者系统的流量等等影响因素。 本项目基于使用成本、耦合成本和运维成本考虑,采用RPC的方案来实现。并且根据泛化调用,也能同样实现层级解耦。

  2. Q:架构设计为什么要增加router层?上行消息gate层为什么直接访问service,而下行消息service则需要经过router和Kafka来跟gate通信?

    A:增加router层的目的是根据路由全局信息将通知包或者推送包路由到具体的某一台接入层的机器上,然后router层通过rpc或者MQ发送到gate层,gate发送数据到客户端来进行通信。

  3. Q:为什么接入层用LSB返回的IP来做接入呢?

    A:可以有以下好处:1、灵活的负载均衡策略 可根据最少连接数来分配IP;2、做灰度策略来分配IP;3、AppId业务隔离策略 不同业务连接不同的gate,防止相互影响

  4. Q:为什么应用层心跳对连接进行健康检查?

    A:因为TCP Keepalive状态无法反应应用层状态问题,如进程阻塞、死锁、TCP缓冲区满等情况;并且要注意心跳的频率,频率小则可能及时感知不到应用情况,频率大可能有一定的性能开销。

  5. Q:MQ的使用场景?

    A:IM消息是非常庞大的,比如说群聊相关业务、推送,对于一些业务上可以忍受的场景,尽量使用MQ来解耦和通信,来降低同步通讯的服务器压力。

  6. Q:群消息存一份还是多份,读扩散还是写扩散?

    A:存1份,读扩散。存多份下同一条消息存储了很多次,对磁盘和带宽造成了很大的浪费。可以在架构上和业务上进行优化,来实现读扩散。

  7. Q:消息ID为什么是趋势递增就可以,严格递增的不行吗?

    A:严格递增会有单点性能瓶颈,比如MySQL auto increments;redis性能好但是没有业务语义,比如缺少时间因素,还可能会有数据丢失的风险,属于集中式生成服务。小型IM可以根据业务场景需求直接使用redis的incr命令来实现IM消息唯一ID。本项目采用snowflake算法实现唯一趋势递增ID,即可实现IM消息中,时序性,重复性以及查找功能。

  8. Q:为什么gate层泛化调用logic层,router层泛化调用gate层?

    A:gate层直接调用logic层,不需要调用router层,减少网络传输和RT时间,增加一个服务,就多了一条链路, 就可能会导致服务链路过长,稳定性下降。泛化调用的目的是解耦网关和服务,使其不相互影响。router层泛化调用gate层,目的也是解耦gate层和router层,防止各自层级机器扩容缩容重启升级等场景下互相影响,并且router需要具体调用某一台gate层机器,泛化刚好能实现。

  9. Q:gate层为什么需要开两个端口?

    A:gate会接收客户端的连接请求(被动),需要外网监听端口;entry会主动给logic发请求(主动);entry会接收router给它的通知请求(被动),需要内网监听端口。一个端口对内,一个端口对外。

2 技术细节

  1. Q:本地写数据成功,一定代表对端应用侧接收读取消息了吗?

    A:本地TCP写操作成功,但数据可能还在本地写缓冲区中、网络链路设备中、对端读缓冲区中,并不代表对端应用读取到了数据。

  2. Q:为什么用netty做来做http网关, 而不用tomcat?

    • netty对象池,内存池,高性能线程模型
    • netty堆外内存管理,减少GC压力,jvm管理的只是一个很小的DirectByteBuffer对象引用
    • tomcat读取数据和写入数据都需要从内核态缓冲copy到用户态的JVM中,多1次或者2次的拷贝会有性能影响
  3. Q:为什么消息入库后,对于在线状态的用户,单聊直接推送,群聊通知客户端来拉取,而不是直接推送消息给客户端(推拉结合)?

    A:在保证消息实时性的前提下,对于单聊,直接推送。对于群聊,由于群聊人数多,推送的话一份群消息会对群内所有的用户都产生一份推送的消息,推送量巨大。解决办法是按需拉取,当群消息有新消息时候发送时候,服务端主动推送新的消息数量,然后客户端分页按需拉取数据。

  4. Q:为什么除了单聊 群聊 推送等实时性业务,其他的业务都走http协议?

    A:IM协议简单最好,如果让其他的业务请求混进IM协议中,会让其IM变的更复杂,比如离线消息拉取走http通道避免tcp 通道压力过大,影响即时消息下发效率。在比如上传图片和大文件,可以利用HTTP的断点上传和分段上传特性

七、Contact

  • 网站:zhangyaoo.github.io
  • 微信:will_zhangyao