一、Netty概述

官方的介绍:
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.
Netty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端
 

二、为什么使用Netty

  • API使用简单,学习成本低。
  • 功能强大,内置了多种解码编码器,支持多种协议。
  • 性能高,对比其他主流的NIO框架,Netty的性能最优。
  • 社区活跃,发现BUG会及时修复,迭代版本周期短,不断加入新的功能。
  • DubboElasticsearch都采用了Netty,质量得到验证。

三、Netty核心组件

1.Bootstrap/ServerBootstrap【启动器】

Bootstrap 是“引导”的意思,它主要负责整个 Netty 程序的启动、初始化、服务器连接等过程,它相当于一条主线,串联了 Netty 的其他核心组件。

PS:Netty 中的引导器共分为两种类型:一个为用于客户端引导的 Bootstrap,另一个为用于服务端引导的 ServerBootStrap。

2.Channel【通道】

Channel 是网络数据的传输通道,它代表了到实体(如硬件设备、文件、网络套接字或能够执行 I/O 操作的程序组件)的开放连接,如读操作和写操作。

Channel 提供了基本的 API 用于网络 I/O 操作,如 register、bind、connect、read、write、flush 等。Netty 自己实现的 Channel 是以 JDK NIO Channel 为基础的,相比较于 JDK NIO,Netty 的 Channel 提供了更高层次的抽象,同时屏蔽了底层 Socket 的复杂性,赋予了 Channel 更加强大的功能,你在使用 Netty 时基本不需要再与 Java Socket 类直接打交道。

常见的 Channel 类型有以下几个:

  • NioServerSocketChannel 异步 TCP 服务端。
  • NioSocketChannel 异步 TCP 客户端。
  • OioServerSocketChannel 同步 TCP 服务端。
  • OioSocketChannel 同步 TCP 客户端。
  • NioDatagramChannel 异步 UDP 连接。
  • OioDatagramChannel 同步 UDP 连接。

当然 Channel 也会有多种状态,如连接建立、连接注册、数据读写、连接销毁等状态。

3.EventLoopGroup/EventLoop【事件循环器】

EventLoopGroup 是一个处理 I/O 操作和任务的线程组。在 Netty 中,EventLoopGroup 负责接受客户端的连接,以及处理网络事件,如读/写事件。它包含多个 EventLoop,每个 EventLoop 包含一个 Selector 和一个重要的组件,用于处理注册到其上的 Channel 的所有 I/O 事件

3.1 EventLoopGroup、EventLoop和Channel

它们三者的关系如下:

  1. 一个 EventLoopGroup 往往包含一个或者多个 EventLoop。EventLoop 用于处理 Channel 生命周期内的所有 I/O 事件,如 accept、connect、read、write 等 I/O 事件。
  2. EventLoop 同一时间会与一个线程绑定,每个 EventLoop 负责处理多个 Channel。
  3. 每新建一个 Channel,EventLoopGroup 会选择一个 EventLoop 与其绑定。该 Channel 在生命周期内都可以对 EventLoop 进行多次绑定和解绑。

3.2 线程模型

Netty 通过创建不同的 EventLoopGroup 参数配置,就可以支持 Reactor 的三种线程模型:

  1. 单线程模型:EventLoopGroup 只包含一个 EventLoop,Boss 和 Worker 使用同一个EventLoopGroup;
  2. 多线程模型:EventLoopGroup 包含多个 EventLoop,Boss 和 Worker 使用同一个EventLoopGroup;
  3. 主从多线程模型:EventLoopGroup 包含多个 EventLoop,Boss 是主 Reactor,Worker 是从 Reactor,它们分别使用不同的 EventLoopGroup,主 Reactor 负责新的网络连接 Channel 创建,然后把 Channel 注册到从 Reactor。

4.ChannelHandler【通道处理器】

最核心的组件之一

ChannelHandler 支持很多协议,并且提供用于数据处理的容器。我们已经知道 ChannelHandler 由特定事件触发。 ChannelHandler 可专用于几乎所有的动作,包括将一个对象转为字节(或相反),执行过程中抛出的异常处理。

常用的一个接口是 ChannelInboundHandler,这个类型接收到入站事件(包括接收到的数据)可以处理应用程序逻辑。当你需要提供响应时,你也可以从 ChannelInboundHandler 冲刷数据。一句话,业务逻辑经常存活于一个或者多个 ChannelInboundHandler。

我们通常使用其子类实现类做一些数据处理,如ChannelInboundHandlerAdapter、SimpleChannelInboundHandler,我们只需要集成这些类,实现其方法即可

5.ChannelPipeline【通道管道】

ChannelPipeline 是 ChannelHandler 的容器,提供了一种方式,以链式的方式组织和处理跨多个 ChannelHandler 之间的交互逻辑。当数据在管道中流动时,它会按照 ChannelHandler 的顺序被处理。

ChannelPipeline还提供了方便的操作方法,如添加、移除和替换ChannelHandler。

addFirst、addLast、addBefore、remove......

四、使用Netty实现一个简单的聊天系统

1.添加依赖:

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.107.Final</version>
        </dependency>

2.创建服务端启动类



@Slf4j
@Configuration
public class NettyWebSocketServer {
    public static final int WEB_SOCKET_PORT = 8090;
    public static final NettyWebSocketServerHandler NETTY_WEB_SOCKET_SERVER_HANDLER = new NettyWebSocketServerHandler();
    // 创建线程池执行器
    private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    private EventLoopGroup workerGroup = new NioEventLoopGroup(NettyRuntime.availableProcessors());

    /**
     * 启动
     */
    @PostConstruct
    public void start() throws InterruptedException {
        run();
    }

    /**
     * 销毁
     */
    @PreDestroy
    public void destroy() {
        Future<?> future = bossGroup.shutdownGracefully();
        Future<?> future1 = workerGroup.shutdownGracefully();
        future.syncUninterruptibly();
        future1.syncUninterruptibly();
        log.info("关闭 ws server 成功");
    }

    public void run() throws InterruptedException {
        // 服务器启动引导对象
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new LoggingHandler(LogLevel.INFO)) // 为 bossGroup 添加 日志处理器
                //30秒客户端没有向服务器发送心跳则关闭连接
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
//30秒客户端没有向服务器发送心跳则关闭连接,需要客户端一直发送心跳                   
                        pipeline.addLast(new IdleStateHandler(30, 0, 0));                      
                        pipeline.addLast(new HttpHeadersHandler());
                        pipeline.addLast(new WebSocketServerProtocolHandler("/"));
                      
                        pipeline.addLast(NETTY_WEB_SOCKET_SERVER_HANDLER);
                    }
                });
        // 启动服务器
        serverBootstrap.bind(WEB_SOCKET_PORT).sync();
    }

}

2.创建服务端处理器

ChannelInboundHandlerAdapter实现

//简单处理

public class HttpHeadersHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //客户端连接时有请求
        if (msg instanceof FullHttpRequest) {

            FullHttpRequest request = (FullHttpRequest) msg;
            HttpHeaders headers = request.headers();
            String token = headers.get("token");
            //token校验
            if(!isVaildToken(token)){
              return;
            }
            NettyUtil.setToken( token);
            //连接时候处理token ,处理完成后移除这个处理器,防止发消息重复校验
            ctx.pipeline().remove(this);
            ctx.fireChannelRead(request);
        }else
        {
            ctx.fireChannelRead(msg);
        }
    }

    private boolean isVaildToken(String token) {
       //校验逻辑
        return true;
    }
}

SimpleChannelInboundHandler<TextWebSocketFrame>实现


@Slf4j
@Sharable
public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    private WebSocketService webSocketService;

    // 当web客户端连接后,触发该方法
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
     
        this.webSocketService = getService();
    }

    // 客户端离线
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        userOffLine(ctx);
    }

    /**
     * 取消绑定
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
      
        userOffLine(ctx);
    }

    private void userOffLine(ChannelHandlerContext ctx) {
        this.webSocketService.removed(ctx.channel());
        ctx.channel().close();
    }

    /**
     * 心跳检查
     *
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        //检测到客户端的"读空闲"状态,即服务器未从客户端接收到任何数据达到指定的超时时间。--断开连接
        // pipeline.addLast(new IdleStateHandler(30, 0, 0)) 设置了30s定时检测一次,如果30s内没有读数据,则触发IdleStateEvent事件
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            // 读空闲
            if (idleStateEvent.state() == IdleState.READER_IDLE) {
                // 关闭用户的连接
                userOffLine(ctx);
            }
        }
       // 握手完成(HandshakeComplete)---存储授权用户
        else if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
            this.webSocketService.connect(ctx.channel(),NettyUtil.getToken());
        }
        super.userEventTriggered(ctx, evt);
    }

    // 处理异常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       
        ctx.channel().close();
    }

    private WebSocketService getService() {
        return SpringUtil.getBean(WebSocketService.class);
    }

    // 读取客户端发送的请求报文
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        log.info("收到客户端消息:{}", msg.text());
    }
}

3.创建消息处理服务类



public interface WebSocketService {



    void connect(Channel channel,String token);


    void removed(Channel channel);



    void sendToAllOnline(String message,String currentUid);

    void sendToAOnlineUid(String message,String uid);
}


@Component
@Slf4j
public class WebSocketServiceImpl implements WebSocketService {


    //所有在线连接
    private static final ConcurrentHashMap<Channel, String> ONLINE_UID_MAP = new ConcurrentHashMap<>();

    @Override
    public void connect(Channel channel,String token) {
        //解析token获取uid
        String uid = analysisToken(token);
        ONLINE_UID_MAP.put(channel,uid);
    }

    private String analysisToken(String token) {
        return token;
    }

    @Override
    public void removed(Channel channel) {
        ONLINE_UID_MAP.remove(channel);
    }

    @Override
    public void authorize(Channel channel, WSAuthorize wsAuthorize) {

    }

    @Override
    public void sendToAllOnline(String message,String currentUid) {
        ONLINE_UID_MAP.forEach((channel, uid) -> {
            if (Objects.nonNull(currentUid) && Objects.equals(uid, currentUid)) {
                return;
            }
            sendMsg(channel,message);
        });
    }

    @Override
    public void sendToAOnlineUid(String message, String uid) {
        ONLINE_UID_MAP.forEach((channel, sendUid) -> {
            if (!StrUtil.isBlank(sendUid) && Objects.equals(uid, sendUid)) {
                sendMsg(channel,message);
            }
        });
    }

    private void sendMsg(Channel channel,String msg) {
        channel.writeAndFlush(new TextWebSocketFrame(msg));
    }


}

4.简单使用测试

    @PostMapping("/sendMsg")
    public Boolean sendMsg(@RequestBody Map<String, Object> params, HttpServletRequest request) {
        String type = params.get("type").toString();
        //简易模拟获取uid
        String header = request.getHeader("token");
        String currentUid = this.getUid(header);
        //群发
        if (Objects.equals("all", type)){
            webSocketService.sendToAllOnline(params.get("message").toString(),currentUid);
        }
        //单发
        else if (Objects.equals("user", type)){
            webSocketService.sendToAOnlineUid(params.get("message").toString(),params.get("uid").toString());
        }
        return true;
    }

    private String getUid(String header) {
        return header;
    }

开启两个客户端,群发,单发

1客户端

2客户端

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐