Based on Netty network framework and Springboot framework.
The target is let easy to build a network cluster server. Minimize the use of third-party dependency libraries as much as possible.
- Supports multiple network protocols, including TCP, UDP, HTTP, and WEBSOCKET and MQTT; Multiple protocols can be configured on the same port.
- Supports multi-node clusters and dynamic node joining and exiting
-
Supports message forwarding between internal and external networks
-
Supports distributed transactions
server-message-queue
module
-
server-framework
: Thecore module
of the framework, which provides the basic configuration of the framework, and the configuration of the network protocol, the message forwarding, and the message processing. -
server-database
: Thedatabase
module of the framework, useMybatis
andMySQL
to implement the database operation. If you need to use a MySQL database, you can use it.that is optional
. -
server-cache
: Thecache
module of the framework, useRedisson
to implement theRedis
cache operation, and useCaffeine
to implement the local cache operation. If you need to use a caching module, you can use it.that is optional
. -
server-permission
: Thepermission
module of the framework, UseSpring AOP
and annotations to control interface permissions. The HTTP protocol uses the request header entrainment JWT method, thepermissions
field in theUserSession
is used for TCP and UDP and WebSocket. If you need to use apermission
module, you can use it.that is optional
. -
server-access-control
: Theaccess control
module of the framework, UseSpring AOP
and annotations andGuava library
to limit the rate of the interface. You can limit the rate of interfaces uniformly or based on user granularity, and configure IP blacklists and whitelists. If you need to use aaccess control
module, you can use it.that is optional
. -
server-message-queue
: Themessage queue
module of the framework, Use theserver-framework
module to develop a message queue module that supports message subscription and message push. If you need to use amessage queue
module, you can use it.that is optional
. -
gateway-server
、hall-server
、room-server
: These are threesample nodes
of the cluster, and you can refer to their configurations to get your own cluster
- You must had installed JDK 1.8+ and Maven 3.x
- Switch to the project directory and run
mvn clean install
- Add the
server-framework
dependency in your project
<dependency>
<groupId>com.hbsoo</groupId>
<artifactId>server-framework</artifactId>
<version>1.0.0</version>
</dependency>
- Add the
server-framework
Configuration to your Springboot projectapplication.yml
file.
hbsoo:
server:
tcpHeader: THBS # TCP header
udpHeader: UHBS # UDP header
id: 1000 #Current node id
threadPoolSize:
insideClient: 5 #inside client side business thread pool size
insideServer: 5 #inside server side business thread pool size
outsideServer: 5 #outside server side business thread pool size
outsideServer:
enable: true #Whether to enable the outside server
port: 5555 #Outside server port
protocol: "TCP,UDP,WEBSOCKET,HTTP,MQTT" #Outside server protocol,Which protocols to use.
insideServers:
- host: 192.168.1.104
port: 6000
type: gateway #Inside server type,that's namespace customized
clientSize: 1 #Connect to inside server client size
weight: 10 #Inside server weight
id: 1000 #Inside server id; At least one id in the list of insideServers is associated with the current node id
- host: 192.168.1.104
port: 6003
type: hall
clientSize: 1
id: 2000
- host: 192.168.1.104
port: 6006
type: room
clientSize: 1
id: 3000
- Define the HTTP message handler as follows
@OutsideMessageHandler(value = 0, uri = "/index", protocol = Protocol.HTTP)
public class IndexAction extends HttpServerMessageDispatcher {
@Override
public void handle(ChannelHandlerContext ctx, HttpPacket httpPacket) {
List<String> genealogies = new ArrayList<>();
genealogies.add("zun");
responseJson(httpPacket, genealogies);
forward2InsideServerUseSender(
NetworkPacket.Builder.withDefaultHeader()
.msgType(100).writeStr(genealogies.toString()),
"hall",
"",3);
QueueMessageSender.publish("hall", "test", genealogies.toString());
}
}
- Define the WEBSOCKET message handler as follows
@OutsideMessageHandler(value = 100, protocol = Protocol.WEBSOCKET)
public class LoginChatRoomAction extends ServerMessageDispatcher {
private static final Logger logger = LoggerFactory.getLogger(LoginChatRoomAction.class);
@Override
public void handle(ChannelHandlerContext ctx, NetworkPacket.Decoder decoder) {
String username = decoder.readStr();
String channelId = decoder.readStr();
int userId = Math.abs(username.hashCode());
logger.info("login chat room username:{},channelId:{},userId:{}", username, channelId, userId);
//notify client login success
NetworkPacket.Builder builder = decoder.toBuilder().writeInt(userId).writeStr(Permission.USER.name());
builder.sendTcpTo(ctx.channel());
//forward to room server
forward2insideServerUseSender(builder, "room", userId);
}
@Override
public Object threadKey(ChannelHandlerContext ctx, NetworkPacket.Decoder decoder) {
return decoder.readStr();
}
}
- Client side network packet and server side network packet as follows
- Define the transaction queue message handler at sender side as follows
@MessageListener(topic = "test", serverType = "hall")
public class MessageQueueTest implements TransactionQueueMessageSenderHandler {
private static final Logger logger = LoggerFactory.getLogger(MessageQueueTest.class);
@Override
public void handleCallback(CallbackMessage callbackMessage) {
logger.info("callbackMessage = {}", callbackMessage);
}
@Override
public int consumerSize() {
return 2;
}
@Override
public boolean handle(Long msgId, String objJson) {
logger.debug("handle msgId = {},objJson = {}", msgId, objJson);
return true;
}
@Override
public boolean rollback(Long msgId, String objJson) {
logger.debug("rollback msgId = {},objJson = {}", msgId, objJson);
return true;
}
}
- Define the transaction queue message handler at receiver side as follows
@MessageListener(topic = "test", serverType = "hall")
public class MessageQueueTest implements TransactionQueueMessageHandler {
private static final Logger logger = LoggerFactory.getLogger(MessageQueueTest.class);
@Override
public boolean handle(Long msgId, String objJson) {
logger.debug("handle msgId = {},objJson = {}", msgId, objJson);
return false;
}
@Override
public boolean rollback(Long msgId, String objJson) {
logger.debug("rollback msgId = {},objJson = {}", msgId,objJson);
return false;
}
}
- Publish the transaction queue message as follows
@OutsideMessageHandler(value = 0, uri = "/index", protocol = Protocol.HTTP)
public class IndexAction extends HttpServerMessageDispatcher {
@Override
public void handle(ChannelHandlerContext ctx, HttpPacket httpPacket) {
List<String> genealogies = new ArrayList<>();
genealogies.add("zun");
responseJson(httpPacket, genealogies);
// publish transaction queue message
QueueMessageSender.publish("hall", "test", genealogies.toString());
}
}
- You must had installed JDK 1.8+ and Maven 3.x
- Clone the project and import it into your IDE
- Run the project
- Jvm configuration parameters as follows
-Xms512m -Xmx512m -XX:+HeapDumpOnOutOfMemoryError
- The interface tested was IndexAction, as follows
@PermissionAuth(permission = {})
//@AccessLimit(userRateSize = 1, globalRateSize = 2)
@OutsideMessageHandler(value = 0, uri = "/index", protocol = Protocol.HTTP)
public class IndexAction extends HttpServerMessageDispatcher {
@Autowired
private IGenealogyService genealogyService;
@Override
public void handle(ChannelHandlerContext ctx, HttpPackage httpPacket) {
final List<Genealogy> genealogies = genealogyService.listAll();
//System.out.println("genealogies = " + genealogies);
responseJson(ctx, httpPacket, genealogies);
forward2insideServerUseSender(
NetworkPacket.Builder.withDefaultHeader()
.msgType(100).writeStr(genealogies.toString()),
"hall",
"",3);
QueueMessageSender.publish("hall", "test", genealogies.toString());
}
@Override
public Object threadKey(ChannelHandlerContext ctx, NetworkPacket.Decoder decoder) {
return null;
}
}
- The stress test tool is
jmeter
, and the configuration is as follows
- Test result
调试资源泄露,启动时添加:-Xms12m -Xmx12m -Dio.netty.leakDetection.level=paranoid
内网不同协议的转发外网消息转发到内网处理器中外部用户登录用户消息内网转发用户消息外网转发群组消息内网转发群组消息外网转发serverType写在framework中需要解决服务端、客户端消息转发待测试延迟线程池装配,任务执行完之后,处理逻辑http协议抽离完善readmemysql、mybatis、redis、mq等配置本地缓存~~~~,缓存失效时间,缓存失效时,重新查询数据库请求限流内网消息队列和失败重发机制,添加权重转发机制为避免长时间占用链接,没有登录的链接,服务端添加心跳检测机制,如果超过一定时间没有收到心跳,则主动断开链接外网支持协议配置化内网消息重组,支持延迟消息,可靠消息(保证送达与幂等性)分布式事务?内网服务器登录,将已登录的session同步给登录服务器内网服务登出,同步消息给登录服务器用户登录接口抽离由使用框架者实现,同时还有用户登录内网同步和登出内网同步接口内网消息不可达时,可选路由到相同类型的其他节点- 消息转发,重复发送检测
MQTT协议支持?协议头配置化?- 注册接口?
接口权限控制?做一个im群组聊天室,测试框架完善度让内部服务客户端发送同步消息(客户端发送等待服务端返回消息)重构NetworkPacket,body部分改成rawBody、extendBody,方便扩展udp协议问题修复,消息体改版后其他模块的适配消息解码时,添加消息体长度校验:java.lang.OutOfMemoryError: Java heap space转发延迟时间单位可选