组件一、channelProvider

为netty客户端提供可用的通道的组件,本质上就是维护了一个channelMap,为了线程安全,这里用的是concurrentHashMap(有点连接池的意思,有状态感知获取channel时判断channel状态)

1、get方法:

根据传入的服务地址(如127.0.0.1:9980)查找对应的channel,如果channel已经关闭或者不可用就清理缓存避免使用无效连接发生异常

2、set方法:

新建连接时调用,将建立的channel连接缓存起来,避免频繁连接导致性能问题

3、remove:

一般是断开或者异常时调用,确保下次连接时重新建立连接而不是复用异常的连接

存在不足:

1、可以添加心跳机制来检测channel是否健康,自动清理死连接

@Slf4j

public class ChannelProvider {

    private final Map<String, Channel> channelMap;

    public ChannelProvider() {

        channelMap = new ConcurrentHashMap<>();

    }

    public Channel get(InetSocketAddress inetSocketAddress) {

        String key = inetSocketAddress.toString();

        // determine if there is a connection for the corresponding address

        if (channelMap.containsKey(key)) {

            Channel channel = channelMap.get(key);

            // if so, determine if the connection is available, and if so, get it directly

            if (channel != null && channel.isActive()) {

                return channel;

            } else {

                channelMap.remove(key);

            }

        }

        return null;

    }

    public void set(InetSocketAddress inetSocketAddress, Channel channel) {

        String key = inetSocketAddress.toString();

        channelMap.put(key, channel);

    }

    public void remove(InetSocketAddress inetSocketAddress) {

        String key = inetSocketAddress.toString();

        channelMap.remove(key);

        log.info("Channel map size :[{}]", channelMap.size());

    }

}

组件二、UnprocessedRequests

这个组件是用来缓存尚未被处理的completableFuture的,本质上也是在维护一个ConcurrentHashMap,实现了一个异步非阻塞的通信模型,

1、put方法,客户端发送请求后调用这个方法,把请求ID和CompletableFuture缓存起来,等待服务端响应

2、complete方法,当客户端收到服务端响应时调用这个方法,根据请求ID找到对应的future方法,调用future的complete方法来触发一个回调,同时从这个缓存中移除future防止内存泄露

存在不足:

1、未添加超时机制,如果服务端一直不响应,future就会一直阻塞在那里,导致内存泄露和线程的阻塞

public class UnprocessedRequests {

    private static final Map<String, CompletableFuture<RpcResponse<Object>>> UNPROCESSED_RESPONSE_FUTURES = new ConcurrentHashMap<>();

    public void put(String requestId, CompletableFuture<RpcResponse<Object>> future) {

        UNPROCESSED_RESPONSE_FUTURES.put(requestId, future);

    }

    public void complete(RpcResponse<Object> rpcResponse) {

        CompletableFuture<RpcResponse<Object>> future = UNPROCESSED_RESPONSE_FUTURES.remove(rpcResponse.getRequestId());

        if (null != future) {

            future.complete(rpcResponse);

        } else {

            throw new IllegalStateException();

        }

    }

}

组件三、Netty客户端

netty客户端,负责初始化、连接服务端、发送请求、管理连接复用,并处理异步调用逻辑。

1、doconnect:

异步连接服务端,监听连接成功事件,成功后将channel放入CompletableFuture中

2、sendRpcRequest

从请求中获取服务地址,根据服务发现拿到服务的channel(自动连接或者复用已有的连接),将请求ID和channel信息存入map等待响应

如果找到了连接的channel就发送rpcMessage,下面的写法是典型的netty异步发送消息+回调处理的写法,异步发送一个消息到channel并刷新缓冲区(即发送数据到服务端),监听到发送成功了就打印记录日志,如果失败就发送失败原因

3、getchannel:

从map中获取活跃的连接,具体看之前channelProvider组件的实现

4、close:

优雅地关闭eventLoopGroup,这个函数不会立即中断正在执行的任务,netty会将每个eventLoop标记为“立即关闭”,不再接受新的任务并继续执行队列中剩余的异步任务,最终关闭所有eventLoop及背后的线程,并释放相关资源如selector和socket连接等

存在不足:

1、没有实现重试机制,比如我通信连接失败了,那是不是可以重试几次这个channel再去删除

@Slf4j

public final class NettyRpcClient implements RpcRequestTransport {

    private final ServiceDiscovery serviceDiscovery;

    private final UnprocessedRequests unprocessedRequests;

    private final ChannelProvider channelProvider;

    private final Bootstrap bootstrap;

    private final EventLoopGroup eventLoopGroup;

    public NettyRpcClient() {

        // initialize resources such as EventLoopGroup, Bootstrap

        eventLoopGroup = new NioEventLoopGroup();

        bootstrap = new Bootstrap();

        bootstrap.group(eventLoopGroup)

                .channel(NioSocketChannel.class)

                .handler(new LoggingHandler(LogLevel.INFO))

                //  The timeout period of the connection.

                //  If this time is exceeded or the connection cannot be established, the connection fails.

                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)

                .handler(new ChannelInitializer<SocketChannel>() {

                    @Override

                    protected void initChannel(SocketChannel ch) {

                        ChannelPipeline p = ch.pipeline();

                        // If no data is sent to the server within 15 seconds, a heartbeat request is sent

                        p.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));

                        p.addLast(new RpcMessageEncoder());

                        p.addLast(new RpcMessageDecoder());

                        p.addLast(new NettyRpcClientHandler());

                    }

                });

        this.serviceDiscovery = ExtensionLoader.getExtensionLoader(ServiceDiscovery.class).getExtension(ServiceDiscoveryEnum.ZK.getName());

        this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class);

        this.channelProvider = SingletonFactory.getInstance(ChannelProvider.class);

    }

    /**

     * connect server and get the channel ,so that you can send rpc message to server

     *

     * @param inetSocketAddress server address

     * @return the channel

     */

    @SneakyThrows

    public Channel doConnect(InetSocketAddress inetSocketAddress) {

        CompletableFuture<Channel> completableFuture = new CompletableFuture<>();

        bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> {

            if (future.isSuccess()) {

                log.info("The client has connected [{}] successful!", inetSocketAddress.toString());

                completableFuture.complete(future.channel());

            } else {

                throw new IllegalStateException();

            }

        });

        return completableFuture.get();

    }

    @Override

    public Object sendRpcRequest(RpcRequest rpcRequest) {

        // build return value

        CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();

        // get server address

        InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest);

        // get  server address related channel

        Channel channel = getChannel(inetSocketAddress);

        if (channel.isActive()) {

            // put unprocessed request

            unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);

            RpcMessage rpcMessage = RpcMessage.builder().data(rpcRequest)

                    .codec(SerializationTypeEnum.HESSIAN.getCode())

                    .compress(CompressTypeEnum.GZIP.getCode())

                    .messageType(RpcConstants.REQUEST_TYPE).build();

            channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {

                if (future.isSuccess()) {

                    log.info("client send message: [{}]", rpcMessage);

                } else {

                    future.channel().close();

                    resultFuture.completeExceptionally(future.cause());

                    log.error("Send failed:", future.cause());

                }

            });

        } else {

            throw new IllegalStateException();

        }

        return resultFuture;

    }

    public Channel getChannel(InetSocketAddress inetSocketAddress) {

        Channel channel = channelProvider.get(inetSocketAddress);

        if (channel == null) {

            channel = doConnect(inetSocketAddress);

            channelProvider.set(inetSocketAddress, channel);

        }

        return channel;

    }

    public void close() {

        eventLoopGroup.shutdownGracefully();

    }

}

组件四、NettyRpcClientHandler

这个组件是netty服务器的核心业务处理器,是 Netty 客户端的核心业务处理器,负责处理服务端返回的消息(响应或心跳)、触发异步回调、发送心跳包以及异常处理。能够接收服务端响应消息,解析RpcMessage并完成对应的CompletableFuture,支持心跳机制检测写空闲事件并自动发送心跳请求,同时通过ConcurrentHashMap保证线程安全

1、channelRead:

从channel中读取一个message,判断这个message如果是普通的心跳信息,如果是请求的响应就提取其中的RpcResponse对象,并从unprocessedRequest中完成这个请求对应的future,最后释放msg的内存防止泄露

2、userEventTriggered:

实现了一个心跳机制,当检测到空闲(5秒没数据时)就发送心跳请求到服务端,如果发送失败就断开这个连接

3、exceptionCaught:

异常捕获,捕获到异常后自动断开连接并打印异常信息

存在不足:

1、心跳请求参数硬编码 codec、compress、data 都是硬编码

2、ctx.close() 不够优雅,直接关闭连接会导致客户端需要重新建立连接,可以增加一些重试机制

3、可以在捕获到错误后进行服务熔断等等

@Slf4j

public class NettyRpcClientHandler extends ChannelInboundHandlerAdapter {

    private final UnprocessedRequests unprocessedRequests;

    private final NettyRpcClient nettyRpcClient;

    public NettyRpcClientHandler() {

        this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class);

        this.nettyRpcClient = SingletonFactory.getInstance(NettyRpcClient.class);

    }

    /**

     * Read the message transmitted by the server

     */

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg) {

        try {

            log.info("client receive msg: [{}]", msg);

            if (msg instanceof RpcMessage) {

                RpcMessage tmp = (RpcMessage) msg;

                byte messageType = tmp.getMessageType();

                if (messageType == RpcConstants.HEARTBEAT_RESPONSE_TYPE) {

                    log.info("heart [{}]", tmp.getData());

                } else if (messageType == RpcConstants.RESPONSE_TYPE) {

                    RpcResponse<Object> rpcResponse = (RpcResponse<Object>) tmp.getData();

                    unprocessedRequests.complete(rpcResponse);

                }

            }

        } finally {

            ReferenceCountUtil.release(msg);

        }

    }

    @Override

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        if (evt instanceof IdleStateEvent) {

            IdleState state = ((IdleStateEvent) evt).state();

            if (state == IdleState.WRITER_IDLE) {

                log.info("write idle happen [{}]", ctx.channel().remoteAddress());

                Channel channel = nettyRpcClient.getChannel((InetSocketAddress) ctx.channel().remoteAddress());

                RpcMessage rpcMessage = new RpcMessage();

                rpcMessage.setCodec(SerializationTypeEnum.PROTOSTUFF.getCode());

                rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());

                rpcMessage.setMessageType(RpcConstants.HEARTBEAT_REQUEST_TYPE);

                rpcMessage.setData(RpcConstants.PING);

                channel.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

            }

        } else {

            super.userEventTriggered(ctx, evt);

        }

    }

    /**

     * Called when an exception occurs in processing a client message

     */

    @Override

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

        log.error("client catch exception:", cause);

        cause.printStackTrace();

        ctx.close();

    }

}

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐