Netty基本使用
官方的介绍:Netty isNetty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。
一、Netty概述
官方的介绍:
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.
Netty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。
二、为什么使用Netty
- API使用简单,学习成本低。
- 功能强大,内置了多种解码编码器,支持多种协议。
- 性能高,对比其他主流的NIO框架,Netty的性能最优。
- 社区活跃,发现BUG会及时修复,迭代版本周期短,不断加入新的功能。
- Dubbo、Elasticsearch都采用了Netty,质量得到验证。
三、Netty核心组件
1.Bootstrap/ServerBootstrap【启动器】
Bootstrap 是“引导”的意思,它主要负责整个 Netty 程序的启动、初始化、服务器连接等过程,它相当于一条主线,串联了 Netty 的其他核心组件。
PS:Netty 中的引导器共分为两种类型:一个为用于客户端引导的 Bootstrap,另一个为用于服务端引导的 ServerBootStrap。
2.Channel【通道】
Channel 是网络数据的传输通道,它代表了到实体(如硬件设备、文件、网络套接字或能够执行 I/O 操作的程序组件)的开放连接,如读操作和写操作。
Channel 提供了基本的 API 用于网络 I/O 操作,如 register、bind、connect、read、write、flush 等。Netty 自己实现的 Channel 是以 JDK NIO Channel 为基础的,相比较于 JDK NIO,Netty 的 Channel 提供了更高层次的抽象,同时屏蔽了底层 Socket 的复杂性,赋予了 Channel 更加强大的功能,你在使用 Netty 时基本不需要再与 Java Socket 类直接打交道。
常见的 Channel 类型有以下几个:
- NioServerSocketChannel 异步 TCP 服务端。
- NioSocketChannel 异步 TCP 客户端。
- OioServerSocketChannel 同步 TCP 服务端。
- OioSocketChannel 同步 TCP 客户端。
- NioDatagramChannel 异步 UDP 连接。
- OioDatagramChannel 同步 UDP 连接。
当然 Channel 也会有多种状态,如连接建立、连接注册、数据读写、连接销毁等状态。
3.EventLoopGroup/EventLoop【事件循环器】
EventLoopGroup 是一个处理 I/O 操作和任务的线程组。在 Netty 中,EventLoopGroup 负责接受客户端的连接,以及处理网络事件,如读/写事件。它包含多个 EventLoop,每个 EventLoop 包含一个 Selector 和一个重要的组件,用于处理注册到其上的 Channel 的所有 I/O 事件
3.1 EventLoopGroup、EventLoop和Channel
它们三者的关系如下:
- 一个 EventLoopGroup 往往包含一个或者多个 EventLoop。EventLoop 用于处理 Channel 生命周期内的所有 I/O 事件,如 accept、connect、read、write 等 I/O 事件。
- EventLoop 同一时间会与一个线程绑定,每个 EventLoop 负责处理多个 Channel。
- 每新建一个 Channel,EventLoopGroup 会选择一个 EventLoop 与其绑定。该 Channel 在生命周期内都可以对 EventLoop 进行多次绑定和解绑。
3.2 线程模型
Netty 通过创建不同的 EventLoopGroup 参数配置,就可以支持 Reactor 的三种线程模型:
- 单线程模型:EventLoopGroup 只包含一个 EventLoop,Boss 和 Worker 使用同一个EventLoopGroup;
- 多线程模型:EventLoopGroup 包含多个 EventLoop,Boss 和 Worker 使用同一个EventLoopGroup;
- 主从多线程模型:EventLoopGroup 包含多个 EventLoop,Boss 是主 Reactor,Worker 是从 Reactor,它们分别使用不同的 EventLoopGroup,主 Reactor 负责新的网络连接 Channel 创建,然后把 Channel 注册到从 Reactor。
4.ChannelHandler【通道处理器】
最核心的组件之一
ChannelHandler 支持很多协议,并且提供用于数据处理的容器。我们已经知道 ChannelHandler 由特定事件触发。 ChannelHandler 可专用于几乎所有的动作,包括将一个对象转为字节(或相反),执行过程中抛出的异常处理。
常用的一个接口是 ChannelInboundHandler,这个类型接收到入站事件(包括接收到的数据)可以处理应用程序逻辑。当你需要提供响应时,你也可以从 ChannelInboundHandler 冲刷数据。一句话,业务逻辑经常存活于一个或者多个 ChannelInboundHandler。


我们通常使用其子类实现类做一些数据处理,如ChannelInboundHandlerAdapter、SimpleChannelInboundHandler,我们只需要集成这些类,实现其方法即可
5.ChannelPipeline【通道管道】
ChannelPipeline 是 ChannelHandler 的容器,提供了一种方式,以链式的方式组织和处理跨多个 ChannelHandler 之间的交互逻辑。当数据在管道中流动时,它会按照 ChannelHandler 的顺序被处理。
ChannelPipeline还提供了方便的操作方法,如添加、移除和替换ChannelHandler。
addFirst、addLast、addBefore、remove......
四、使用Netty实现一个简单的聊天系统
1.添加依赖:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.107.Final</version>
</dependency>
2.创建服务端启动类
@Slf4j
@Configuration
public class NettyWebSocketServer {
public static final int WEB_SOCKET_PORT = 8090;
public static final NettyWebSocketServerHandler NETTY_WEB_SOCKET_SERVER_HANDLER = new NettyWebSocketServerHandler();
// 创建线程池执行器
private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private EventLoopGroup workerGroup = new NioEventLoopGroup(NettyRuntime.availableProcessors());
/**
* 启动
*/
@PostConstruct
public void start() throws InterruptedException {
run();
}
/**
* 销毁
*/
@PreDestroy
public void destroy() {
Future<?> future = bossGroup.shutdownGracefully();
Future<?> future1 = workerGroup.shutdownGracefully();
future.syncUninterruptibly();
future1.syncUninterruptibly();
log.info("关闭 ws server 成功");
}
public void run() throws InterruptedException {
// 服务器启动引导对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO)) // 为 bossGroup 添加 日志处理器
//30秒客户端没有向服务器发送心跳则关闭连接
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//30秒客户端没有向服务器发送心跳则关闭连接,需要客户端一直发送心跳
pipeline.addLast(new IdleStateHandler(30, 0, 0));
pipeline.addLast(new HttpHeadersHandler());
pipeline.addLast(new WebSocketServerProtocolHandler("/"));
pipeline.addLast(NETTY_WEB_SOCKET_SERVER_HANDLER);
}
});
// 启动服务器
serverBootstrap.bind(WEB_SOCKET_PORT).sync();
}
}
2.创建服务端处理器
ChannelInboundHandlerAdapter实现
//简单处理
public class HttpHeadersHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//客户端连接时有请求
if (msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
HttpHeaders headers = request.headers();
String token = headers.get("token");
//token校验
if(!isVaildToken(token)){
return;
}
NettyUtil.setToken( token);
//连接时候处理token ,处理完成后移除这个处理器,防止发消息重复校验
ctx.pipeline().remove(this);
ctx.fireChannelRead(request);
}else
{
ctx.fireChannelRead(msg);
}
}
private boolean isVaildToken(String token) {
//校验逻辑
return true;
}
}
SimpleChannelInboundHandler<TextWebSocketFrame>实现
@Slf4j
@Sharable
public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private WebSocketService webSocketService;
// 当web客户端连接后,触发该方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.webSocketService = getService();
}
// 客户端离线
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
userOffLine(ctx);
}
/**
* 取消绑定
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
userOffLine(ctx);
}
private void userOffLine(ChannelHandlerContext ctx) {
this.webSocketService.removed(ctx.channel());
ctx.channel().close();
}
/**
* 心跳检查
*
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
//检测到客户端的"读空闲"状态,即服务器未从客户端接收到任何数据达到指定的超时时间。--断开连接
// pipeline.addLast(new IdleStateHandler(30, 0, 0)) 设置了30s定时检测一次,如果30s内没有读数据,则触发IdleStateEvent事件
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
// 读空闲
if (idleStateEvent.state() == IdleState.READER_IDLE) {
// 关闭用户的连接
userOffLine(ctx);
}
}
// 握手完成(HandshakeComplete)---存储授权用户
else if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
this.webSocketService.connect(ctx.channel(),NettyUtil.getToken());
}
super.userEventTriggered(ctx, evt);
}
// 处理异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().close();
}
private WebSocketService getService() {
return SpringUtil.getBean(WebSocketService.class);
}
// 读取客户端发送的请求报文
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
log.info("收到客户端消息:{}", msg.text());
}
}
3.创建消息处理服务类
public interface WebSocketService {
void connect(Channel channel,String token);
void removed(Channel channel);
void sendToAllOnline(String message,String currentUid);
void sendToAOnlineUid(String message,String uid);
}
@Component
@Slf4j
public class WebSocketServiceImpl implements WebSocketService {
//所有在线连接
private static final ConcurrentHashMap<Channel, String> ONLINE_UID_MAP = new ConcurrentHashMap<>();
@Override
public void connect(Channel channel,String token) {
//解析token获取uid
String uid = analysisToken(token);
ONLINE_UID_MAP.put(channel,uid);
}
private String analysisToken(String token) {
return token;
}
@Override
public void removed(Channel channel) {
ONLINE_UID_MAP.remove(channel);
}
@Override
public void authorize(Channel channel, WSAuthorize wsAuthorize) {
}
@Override
public void sendToAllOnline(String message,String currentUid) {
ONLINE_UID_MAP.forEach((channel, uid) -> {
if (Objects.nonNull(currentUid) && Objects.equals(uid, currentUid)) {
return;
}
sendMsg(channel,message);
});
}
@Override
public void sendToAOnlineUid(String message, String uid) {
ONLINE_UID_MAP.forEach((channel, sendUid) -> {
if (!StrUtil.isBlank(sendUid) && Objects.equals(uid, sendUid)) {
sendMsg(channel,message);
}
});
}
private void sendMsg(Channel channel,String msg) {
channel.writeAndFlush(new TextWebSocketFrame(msg));
}
}
4.简单使用测试
@PostMapping("/sendMsg")
public Boolean sendMsg(@RequestBody Map<String, Object> params, HttpServletRequest request) {
String type = params.get("type").toString();
//简易模拟获取uid
String header = request.getHeader("token");
String currentUid = this.getUid(header);
//群发
if (Objects.equals("all", type)){
webSocketService.sendToAllOnline(params.get("message").toString(),currentUid);
}
//单发
else if (Objects.equals("user", type)){
webSocketService.sendToAOnlineUid(params.get("message").toString(),params.get("uid").toString());
}
return true;
}
private String getUid(String header) {
return header;
}
开启两个客户端,群发,单发

1客户端

2客户端

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)