基于WebSocket和Netty打造聊天服务器

什么是WebSocket

WebSocket是一种基于TCP的通信协议,他在单个TCP连接上提供全双工通道。客户端和服务器建立连接之后,双方可以主动推送数据,而不需要反复握手

你可以设想一下,你和你朋友聊天,传统的HTTP就是对讲机,我说完了之后,你才能说。
WebSocket就像打电话一样,两个人可以自由沟通。

关键特性:一次握手,长期使用,低延迟,轻量级。

好处

传统的HTTP协议是无状态,短链接的,导致实时应用必须轮询。
WebSocket则是

  • 解决实时性需求
  • 减少网络开销
  • 支持双向通信

Spring集成WebSocket,只需要在类上加一个注解即可:

// Spring Boot中,可用@ServerEndpoint注解快速创建WebSocket端点
@ServerEndpoint("/chat")
public class ChatEndpoint {
@OnOpen
public void onOpen(Session session) {
System.out.println("电话接通了!连接ID: " + session.getId());
}
}

注意事项:

  1. WebSocket不能完全代替HTTP!HTTP适合请求-响应式场景,WebSocket专攻双向实时通信。用WebSocket做普通的API调用会增加复杂度
  2. 建立连接后,网络也可能终端,可以实现心跳机制
  3. WebSocket协议本身不加密,必须配合WSS(类似HTTPS)

Netty是什么

如果说WebSocket是电话线,那么Netty就是建造电话网络的工程队。他不介入业务逻辑,而是提供一套高性能,可扩展的通信基础设施。

Netty是一个异步事件驱动的网络框架

好处

  • 原生NIO复杂,需要手动管理线程、选择器、缓冲区
  • 性能高:Netty的Reactor多线程模型(一个线程管理多个连接)

实现

在Spring框架下实现,先启动SpringBoot,端口号8081,把html文件放在resources/templates,然后启动SpringBoot

pom依赖:

<!-- 整合netty开发依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
  1. 首先定义一个启动类,创建一个Netty服务器
public class WebSocketServer {

public static void main(String[] args) throws Exception {
// 定义一对线程组 主线程组 用于接收客户端的连接请求,不做任何处理
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 定义一对线程组 从线程组 主线程组会把任务丢给从线程组,让从线程组去处理
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// netty服务器的创建,ServerBootstrap是一个启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup) // 设置主从线程组
.channel(NioServerSocketChannel.class) // 设置NIO双向通道类型
.childHandler(new WebSocketServerInitializer()); // 子处理器,用于处理workerGroup
// 这个WebSocketServerInitializer我们会创建
// 启动server,绑定8088端口启动,并且同步等待方式启动
ChannelFuture channelFuture = serverBootstrap.bind(8088).sync();

// 监听关闭的channel, 设置为同步的方式
channelFuture.channel().closeFuture().sync();
} finally {
// 关闭我们的主线程组和从线程组
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
  1. 定义一个初始化类
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel channel) throws Exception {
// 通过SocketChannel去获取对应的管道pipeline
ChannelPipeline pipeline = channel.pipeline();

/**
* HTTP编解码器
* 作用:
* - 入站:将字节流解码为HttpRequest对象
* - 出站:将HttpResponse对象编码为字节流
* - 处理WebSocket握手时的HTTP请求
*/
pipeline.addLast("HttpServerCodec", new HttpServerCodec());

/**
* 分块写处理器
* 作用:
* - 支持大文件的分块传输
* - 避免大数据一次性写入造成内存溢出
* - 对于WebSocket的大信息很有用
*/
pipeline.addLast("ChunkedWriteHandler", new ChunkedWriteHandler());

/**
* HTTP消息聚合器
* 几乎在netty的编程中,都会用到这个handler
* 作用:
* - 将分片的HTTP消息聚合成完整的FullHttpRequest/FullHttpResponse
* - 参数1024*64表示最大聚合64KB的信息
* - WebSocket握手需要完整的HTTP消息
*/
pipeline.addLast("HttpObjectAggregator", new HttpObjectAggregator(1024*64));

// ======================== 以上用于支持http协议 ================================

/**
* websocket服务器处理的协议
* 作用
* - 处理WebSocket握手协议
* - 验证WebSocket升级请求
* - 处理WebSocket帧的编解码
* - 参数“/ws”指定WebSocket的访问路径(核心)
*/
pipeline.addLast("WebSocketServerProtocolHandler", new WebSocketServerProtocolHandler("/ws"));

/**
* 自定义的handler
* 作用:
* - 处理具体的聊天业务逻辑
* - 接收WebSocket消息并广播
* - 管理客户端链接
*/
pipeline.addLast(new ChatHandler());
}
}
  1. 定义一个处理器(handler)
/**
* 处理消息的handler
* TextWebSocketFrame:在netty中,是用于为websocket专门处理文本的对象,frame是消息的载体
*/
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

// 用于记录和管理所有客户端的ChannelGroup
// ChannelGroup是Netty提供的一个集合接口,专门用来管理多个Channel,可以对集合中的所有Channel进行批量操作(群发消息、关闭连接等)
// DefaultChannelGroup是ChannelGroup的默认实现,内部用ConcurrentHashMap来存储Channel,线程安全。提供了添加、删除、广播等功能
// GlobalEventExecutor.INSTANCE是一个全局事件执行器,
// 1. 他是一个单例的EventExecutor
// 2. 用于执行ChannelGroup的异步操作
// 3. 当你调用了clients.writeAndFlush()时,实际的写操作会在这个执行器中进行
// 4. 保证了线程安全和异步执行
private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
// 获取客户端传输过来的消息
String content = msg.text();
System.out.println("接收到的数据:" + content);

// 断点B:观察clients集合中有多少个连接
System.out.println("当前连接数:" + clients.size());

for(Channel channel : clients) {
// 不能直接writeAndFlush收到的content字符串信息,必须封装到frame载体中输出
channel.writeAndFlush(new TextWebSocketFrame("系统消息:" + content));
}
// clients.writeAndFlush(new TextWebSocketFrame("系统消息:" + content));

}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 当客户端连接服务端之后,获取客户端的channel,并且放到ChannelGroup中去进行管理
clients.add(ctx.channel());
System.out.println("新客户端连接:" + ctx.channel().id().asShortText());
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 这步是多余的,当断开连接时候ChannelGroup会自动移除对应的channel
clients.remove(ctx.channel());
System.out.println("客户端断开:" + ctx.channel().id().asLongText());
}
}

  1. 简单写一个前端进行测试(index.html)
<!DOCTYPE html>
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<title>Netty-Websocket</title>
<script type="text/javascript">
let socket;
if(window.WebSocket){
socket = new WebSocket("ws://localhost:8088/ws");
socket.onmessage = function(event){
let textarea = document.getElementById('responseText');
textarea.value += event.data+"\r\n";
};
socket.onopen = function(event){
let textarea = document.getElementById('responseText');
textarea.value = "Netty-WebSocket服务器。。。。。。连接 \r\n";
};
socket.onclose = function(event){
let textarea = document.getElementById('responseText');
textarea.value = "Netty-WebSocket服务器。。。。。。关闭 \r\n";
};
} else {
alert("您的浏览器不支持WebSocket协议!");
}

function send(){
if(!window.WebSocket){return;}
if(socket.readyState === WebSocket.OPEN) {
let message = document.getElementById('message').value;
socket.send(message);
} else {
alert("WebSocket 连接没有建立成功!");
}

}

</script>
</head>
<body>
<form onSubmit="return false;">
<label>文本</label><input type="text" id="message" name="message" placeholder="这里输入消息" /> <br />
<br /> <input type="button" value="发送ws消息"
onClick="send()" />
<hr color="black" />
<h3>服务端返回的应答消息</h3>
<textarea id="responseText" style="width: 1024px;height: 300px;"></textarea>
</form>
</body>
</html>

代码详解

  1. 首先我们刚启动我们的启动类时,程序会先走走WebSocketServermain方法,
    然后会创建一个ServerBootstrap对象,然后设置ServerBootstrap对象,然后启动服务。
  2. 我们去浏览器中访问http://localhost:8081/ 这个时候会打开index.html的界面。
  3. 静态页面中的js会判断当前浏览器是否支持WebSocket,如果支持的话,会执行 socket = new WebSocket(“ws://localhost:8088/ws”);
  4. 浏览器向Netty服务器(8088)发起WebSocket握手请求
  5. Netty服务器处理握手,此时会到WebSocketServerInitializer中,通过channel中的pipeline去进行一系列操作:
客户端数据 → HttpServerCodec → ChunkedWriteHandler → HttpObjectAggregator → WebSocketServerProtocolHandler → ChatHandler → 处理完成

执行到我们自定义的handler:pipeline.addLast(new ChatHandler());时,会先创建静态代码块(详解见代码)
private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

然后再调用handlerAdded()方法,将当前连接的channel添加到ChannelGroup中。

  1. 发送消息时,浏览器会携带message发送socket.send(message); 这个请求 。后端会调用handlerRead0()方法,将message数据发送给所有连接的channel。
  2. 当某个浏览器关闭时,会调用handlerRemoved()方法,将channel从ChannelGroup中移除。(但其实浏览器关闭就相当于自动断开连接了)