-
Notifications
You must be signed in to change notification settings - Fork 34
长连接网关
sona-gateway 是比心自研的长连接网关,基于 Netty 开发,同时支持 TCP 和 Websocket。在比心内部叫做 Mercury,承载了比心几乎所有的业务,提供超高的性能,易扩展,普通的4核8G机器,就能稳定支撑几十万连接。
如果不需要整套 SONA 服务,也可以简单修改其中的房间相关业务代码后,单独剥离出来作为网关使用,即使从来没用过 Netty,对Netty一点都不了解,也能非常方便的扩展自己的业务。使用者不用关心如何实现一个私有协议的细节,直接使用我们内置的通信协议。可以非常简单的启动客户端与服务端,同时注册自定义的请求处理器,即可完成双端通信。同时,像连接管理、心跳等基础功能特性都默认可以使用。
网关中最核心的基础通信、消息协议、请求处理器等设计都是通用的:
- 基础通信功能
- 基于 Netty 高效的网络 IO 与线程模型运用
- 连接管理
- 基础通信模型
- 超时控制
- 心跳与空闲事件处理
- 协议框架
- 命令与命令处理器
- 编解码处理器
- 心跳触发器
- 私有协议定制实现
- 二进制通信协议的设计
- 灵活的消息压缩、合并控制
- 请求处理超时FailFast机制
- 用户请求处理器
- 双工通信
消息传输采用 mercury 二进制协议,基于 request-response 模式设计
key | 字节数 | 说明 |
---|---|---|
req/res | 1 | 1表示request , 0表示response |
twoWay | 1 | 是否需要回复response ,1 需要, 0 不需要 |
heartbeat | 1 | 是否心跳 ,1是 , 0 不是 |
version | 1 | 版本号 |
id | varint | 请求或响应的id(从1开始递增,单连接不重复),如果是request请求并且twoWay是false ,设置 0 |
cmd | 1 | command 命令,每个command都有对应的请求处理器 |
length | varint | 所有 header + body 的大小 |
headerSize | 1 | header 的个数 |
headerType | 1 | header 的类型 |
headerLength | varint | header data 的大小 |
headerData | headerLength | header 的数据内容 |
body | length - 所有header的大小 | 消息体 |
如果是心跳,只有 req/res,twoWay,heartbeat,version,id
headerType | description |
---|---|
1 | 开启body压缩 (body 超过 2048 字节进行压缩, Deflater 算法) |
2 | 房间header ,每个房间相关的command请求都会带上 |
3 | 开启批量合并,body中包含多条消息,需要额外解析 |
可以自行扩展 Header ,在 cn.bixin.sona.gateway.common.HeaderEnum 中添加自定义的Header,并在相应的业务逻辑中使用
cmd | description |
---|---|
-1 | 服务端告知客户端断开连接 |
1 | 客户端连接登录认证 |
2 | 客户端主动上报消息 |
10 | 加入房间 |
11 | 离开房间 |
12 | 发送房间消息 |
13 | 发送房间指令 |
14 | 发送群组消息 |
可以自行扩展 CMD ,在 cn.bixin.sona.gateway.common.CommandEnum 中添加自定义的 command,并新增对应的请求处理器
- 在 /META-INF/mercury/cn.bixin.sona.gateway.handler.Handler 文件中添加cmd 和处理器 的映射, key是 cmd (多个可用逗号隔开),value是 处理器的bean name
1=loginAuth
2=clientPush
10,11,12=chatRoom
- 新增自定义请求处理器,只需 extends AbstractHandler,并实现相应的业务逻辑处理即可
采用 Request-Response 模型,主动发起请求的为 request,request 有2中模式:
- 一种是 oneway,request发出去之后,不关心结果,不做超时控制,不需要对端返回 response
- 一种是 twoway,request发出去之后,需要在指定的超时时间内,等到对端返回response。如果超时时间内没有得到结果,那么会抛出超时异常
客户端和服务端都可以主动发起 request ,使用起来非常灵活
为了防止业务处理耗时过久,每个request都会设置超时检测,超时之后会立即 failfast,返回超时失败
Netty 请求处理
Netty 使用 主从Reactor模型
bossGroup 负责处理 accept 事件 , workerGroup 负责处理 read 、write 事件
其中 ChannelPipeline 是一个双向链表,netty 里面定义了十几种事件,触发之后会顺序调用所有 ChannelHandler 的指定方法,ChannelHandler的调用都是由 workerGroup中的同一个 eventloop 线程执行,不存在线程之间的切换。
Mercury 请求处理
Netty 中 ChannelPipeline 这种无锁化的设计,避免了上下文切换,在海量请求的情况下能提供很高的性能。但是也存在风险,如果执行某个 ChannelHandler 出现了阻塞,会拖累这个 eventloop 线程所负责的其他请求。 在实际场景中,ChannelHandler里面一般都会执行一些 IO 操作,比如RPC调用,MQ等,无法保证不会出现阻塞的情况。所以很多高性能的分布式框架中都会使用三层处理模型,额外增加一个业务线程池,将耗时的IO操作放在这个业务线程池里面执行,这样就不会阻塞 workerGroup 中的线程了,Mercury 中也是这么设计的。
网关返回的response ,body中的数据统一使用下面的格式
{
"c": "状态码 0:成功 其他:失败",
"d": "消息响应"
}
cmd: 1
header : 无
body:
{
"d": "设备id",
"p": "平台 1: iOS;2: Android; 3: PC",
"sv": "系统版本号",
"u": "用户ID",
"m": "model",
"t": "通道类型 2 : 房间连接",
"b": "app是否在后台 0: 前台, 1: 后台"
}
cmd: 2
body:
{
"type": "appstate",
"data": {
"foreground": true
}
}
cmd: 10
header : 2
key | 说明 |
---|---|
room | 房间id |
uid | 用户id |
identity | 0 : 游客 , 1: 普通用户 |
body: 无
cmd: 11
header : 2
key | 说明 |
---|---|
room | 房间id |
body: 无
cmd: 12
header : 2
key | 说明 |
---|---|
room | 房间id |
ack | 是否 ack 消息, 1 :是 |
header : 3 (批量消息时才有): 批量条数
body:
客户端发送:
{
"messageId": "消息id",
"roomId": "房间id",
"msgFormat":"消息格式 100:业务 101:文本 102:图片 103:表情 104:语音 105:视频 106:ack",
"msgType": "消息类型",
"priority": "消息优先级",
"uid": "发送uid",
"content": "消息内容",
"needToSave": "消息是否保存",
}
cmd: 13
header : 2
key | 说明 |
---|---|
room | 房间id |
uid | 用户id |
signal | 1: 踢人, 2: 关闭房间 |
body:无
cmd: 14
header : 2
key | 说明 |
---|---|
room | 房间id |
body:
{
"messageId": "消息id",
"roomId": "房间id",
"messageType":"消息类型 100:业务 101:文本 102:图片 103:表情 104:语音 105:视频 106:ack",
"msgType": "消息类型",
"priority": "消息优先级",
"uid": "发送uid",
"content": "消息内容"
}
Mercury 同时支持 TCP和Websocket 2种方式,安卓、IOS和 PC 采用的是 TCP 连接,而H5采用的是 Websocket连接。
客户端首先会通过域名DNS解析连接到阿里云的 SLB,SLB会基于轮训的策略将连接分发到不同的 Mercury 机器上,这样子的好处是可以保证Mercury网关集群每台机器上维护的连接数基本一致,能达到相对的均衡。连接建立之后,Mercury会把客户端所有的请求通过MQ的方式转发给对应的业务处理。
对于单个房间来说,在不同mercury 网关上分布的用户连接可能不是均衡的,但是对于整个网关来说,每台机器是保持相对的均衡。
在sona-sdk 初始化的时候会建立连接,进入房间,后续离开、进入其他房间都会复用这个连接。
对于群组来说,群和群之间并不是隔离状态。对于任何群的消息,服务端都需要通过一条长连接通道来进行消息的下推,并不需要通过 进入群聊 来切换多个群的行为。所以,在 App 启动之后,客户端就会立刻建立一条链接,然后会发出 userinfo 这个指令,向mercury上报当前用户信息,不需要再告知自己当前进入了哪些群,而在服务端网关也只会建立一个 用户 -> 连接 (uid -> channelId)的映射。 因此,在群场景中,当某一个群有一条消息发出时,我们会在业务层将这条消息从群维度扇出成 uid 维度,再下发给mercury网关,网关再基于 channelId 向用户推送消息。
每条连接建立的时候,会生成一个唯一的 channelId ,用来标识连接,在channelId 中包含了当前Mercury 机器的 IP 信息。
当发送群组消息的时候,先根据 groupId 查询当前群组的所有群成员 uids,再去 Redis 里面查询群成员的在线状态,这样能拿到了 用户 -> 连接 的映射,再从 channelId 中解析当前用户所在的 mercury 机器 IP 信息,最终将消息投递到MQ中发给对应的网关服务器。
目前sona 群组是有人数限制的,最多就2百人。
上面介绍的群组实现方式,在单房间人数达到上千、上万的时候就不太适合了。对于聊天室场景来说,如果采用这种模式,一个 10w 人的房间,每条消息需要对这个在线状态进行 10w 次查询,这个量级是非常大的,这里必定会成为系统的瓶颈。
所以聊天室采用了另一种方式,房间在线状态本地化。
用户连接随机分散到不同的网关机器上,而每一台 Mercury 服务器上都会在本地内存维护一份房间的在线状态信息,并且订阅一个全局聊天室的消息队列。当然,房间的用户在线信息还是会上报到Redis里面,给业务方去查询在线列表。
举个例子,以 10w 人的房间来说,目前Mercury 线上有 10 台机器,那么平均每台机器上这个房间的用户大概有 10000 人,我们完全没有必要去精准确认这个房间的用户都在哪台网关服务器上,只需要把这个房间的消息都全量通过MQ投递给所有的机器即可,每台机器也只需要在本地维护一个在线状态缓存,最终由网关把消息推送给本机上当前房间内的所有在线用户。
这种方式可以避免每次都要去第三方缓存服务查询在线列表,Mercury 本地内存中维护的就是最实时的房间在线信息,业务方发送聊天室消息,只需要带过来一个 roomId 和 消息体 即可,Mercury 通过 roomId 拿到本地缓存的当前房间的所有连接,再给这些连接推送消息。这样就能大幅提升聊天室消息的下推能力。而且mercury 网关层可以水平扩容,即使房间人数再增加,只需要加机器就能解决了。
像业务上用的比较多的点对点消息,比如给麦上用户打赏等等,也是这种方式,不过就是在本地缓存拿到当前房间的所有连接后,又做了一次uid的过滤。
当然这种方案可能会存在一定的资源浪费, 因为有的机器上可能并没有这个房间的任何用户信息,但还是会消费到MQ的消息,只不过不做任何处理。但这种情况还是比较少的,而且我们的目标本来就是为了让 Mercury 网关层能支持水平扩容,能够支撑海量房间的,对于这些海量房间来说,这个方案整体上收益还是非常大的。
Mercury 房间消息处理的流程
自定义线程池,在当前场景下提供超高的性能,benchmark测试分数比 jdk 原生线程池高出 一到两个数量级
把生产者队列、消费者队列分开,用两个锁去控制同步,当 consumer queue 为空时,且 producer queue 不为空条件满足时,会交换两个队列
详细可见
cn.bixin.sona.gateway.concurrent.FastThreadPool
借鉴 Sentinel 中 LeapArray
的设计,主要是为了计算当前房间的一个发送频率,如果当前房间不是一个高频房间,消息会立即发送出去。
如果是做活动的房间,比如热门的直播间,消息量巨大,为了进一步压榨网关的性能,对这种高频房间的消息发送会做特殊优化,将消息暂存到一个 ringbuffer 中,当达到条件后进行合并压缩,降低网络带宽。
本来是打算使用disruptor 来实现的,但是 disruptor 里面的buffer比较适合buffer 较大、单 producer 的场景,在多 producer 的情况下 write 会比较复杂,性能较差 。它里面会引入了一个与Ring Buffer 大小相同的 buffer:available buffer。 当某个位置写入成功的时候,便把 availble buffer 相应的位置置位,标记为写入成功。 读取的时候,会遍历 available buffer ,来判断元素是否已经就绪。
而我们的场景是 多producer 多 consumer 的情况,如果使用 disruptor那种方式,性能不见得会高多少。
所以自己设计了一个 无锁化的 ringbuffer,buffer size 最大是16,在buffer size 较小、并发 write 的情况下,能提供不错的性能。
详细可见
cn.bixin.sona.gateway.concurrent.buffer.RingBuffer
- buffer size 必须设置成2的幂次方 ,这样可以通过 位运算来计算下标,提升性能
- index使用long类型,即使100万QPS的处理速度,也需要30万年才能用完
- 下标使用
AtomicLongArray
,通过 CacheLine Padding 来解决伪共享问题 - 使用
AtomicReferenceArray
基于 CAS 实现无锁化操作