ChunkedInput

ChunkedInput<B> 是 Netty 中用于按块读取不定长数据流的接口,常配合 ChunkedWriteHandler 实现流式写入,支持如文件、流、HTTP 和 WebSocket 等多种数据源。

实现类 简要说明
ChunkedFile 用于将常规文件按块传输(使用传统阻塞 IO)。
ChunkedNioFile 用于将 FileChannel 形式的文件通过 NIO 按块传输。
ChunkedNioStream ReadableByteChannel 数据源作为块状输入,适用于 NIO 输入流。
ChunkedStream InputStream(阻塞流)按块读取并传输。
Http2DataChunkedInput 专为 HTTP/2 数据帧的按块输入设计,用于发送 DATA 帧。
HttpChunkedInput HttpContent 对象(如文件块)封装为支持 trailer 的块状 HTTP 输出。
WebSocketChunkedInput 用于 WebSocket 数据帧分块传输,支持大帧拆分成多个 WebSocket 帧。
public interface ChunkedInput<B> extends AutoCloseable {
	// 是否已经读取到输入流末尾
    boolean isEndOfInput() throws Exception;

	// 从数据流中读取下一段数据块(chunk)
    B readChunk(BufferAllocator allocator) throws Exception;

	// 返回整个输入源的长度(如果已知)
    long length();

	// 返回目前已经“传输”的字节数
    long progress();
}

ChunkedWriteHandler

ChunkedWriteHandler 是 Netty 中用于分块写入大数据流(如文件、视频流等)的处理器,核心职责是将大数据拆成小块逐步异步写入,避免一次性占用大量内存,提高传输效率和系统稳定性。主要特点和功能包括:

  1. 分块写入:支持 ChunkedInput 类型的数据流,按块读取并写入,适合无法一次性全部加载到内存的大数据。
  2. 异步处理:内部维护一个待写队列(PendingWrite),通过事件驱动机制逐块写出数据,保证非阻塞的高效传输。
  3. 资源管理:在写入完成或异常关闭时,会自动关闭数据流,释放资源,防止内存泄漏。
  4. 错误处理:遇到写入失败或通道关闭时,能正确通知每个待写任务失败,并清理队列。
  5. 流控制:自动管理写请求,避免写入过快导致拥塞,通过事件循环调度写操作。
public class ChunkedWriteHandler implements ChannelHandler {

    private static final Logger logger = LoggerFactory.getLogger(ChunkedWriteHandler.class);

    private Queue<PendingWrite> queue;
    private volatile ChannelHandlerContext ctx;

    public ChunkedWriteHandler() {}

    private void allocateQueue() {
        if (queue == null) {
            queue = new ArrayDeque<>();
        }
    }

    private boolean queueIsEmpty() {
        return queue == null || queue.isEmpty();
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        this.ctx = ctx;
    }
    
    public void resumeTransfer() {
        final ChannelHandlerContext ctx = this.ctx;
        if (ctx == null) {
            return;
        }
        if (ctx.executor().inEventLoop()) {
            resumeTransfer0(ctx);
        } else {
            ctx.executor().execute(() -> resumeTransfer0(ctx));
        }
    }

    private void resumeTransfer0(ChannelHandlerContext ctx) {
        try {
            doFlush(ctx);
        } catch (Exception e) {
            logger.warn("Unexpected exception while sending chunks.", e);
        }
    }

	// 如果当前有待写队列(queue)不为空,或者写入的消息是 ChunkedInput(分块数据流),则将写操作封装为 PendingWrite 并加入队列,返回对应的 Future(异步写结果)。
	// 否则直接调用下游的 ctx.write(msg)。
    @Override
    public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
        if (!queueIsEmpty() || msg instanceof ChunkedInput) {
            allocateQueue();
            Promise<Void> promise = ctx.newPromise();
            queue.add(new PendingWrite(msg, promise));
            return promise.asFuture();
        } else {
            return ctx.write(msg);
        }
    }

    @Override
    public void flush(ChannelHandlerContext ctx) {
        doFlush(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        doFlush(ctx);
        ctx.fireChannelInactive();
    }

	// 实现基于通道写缓冲区状态的流控,防止写过快导致内存溢出
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isWritable()) {
            doFlush(ctx);
        }
        ctx.fireChannelWritabilityChanged();
    }

	// 从 queue 中依次取出 PendingWrite,若是 ChunkedInput 则判断是否写完,关闭资源,并根据情况调用 fail() 或 success()。
    private void discard(Throwable cause) {
        if (queueIsEmpty()) {
            return;
        }
        for (;;) {
            PendingWrite currentWrite = queue.poll();

            if (currentWrite == null) {
                break;
            }
            Object message = currentWrite.msg;
            if (message instanceof ChunkedInput) {
                ChunkedInput<?> in = (ChunkedInput<?>) message;
                boolean endOfInput;
                try {
                    endOfInput = in.isEndOfInput();
                    closeInput(in);
                } catch (Exception e) {
                    closeInput(in);
                    currentWrite.fail(e);
                    logger.warn("ChunkedInput failed", e);
                    continue;
                }

                if (!endOfInput) {
                    if (cause == null) {
                        cause = new ClosedChannelException();
                    }
                    currentWrite.fail(cause);
                } else {
                    currentWrite.success();
                }
            } else {
                if (cause == null) {
                    cause = new ClosedChannelException();
                }
                currentWrite.fail(cause);
            }
        }
    }

    private void doFlush(final ChannelHandlerContext ctx) {
        final Channel channel = ctx.channel();
       // 如果通道不活跃(比如已关闭),调用 discard(null) 清空队列,释放资源,
       // 随后调用 ctx.flush() 以确保之前写过但未刷新的数据也被处理,最后直接返回。
        if (!channel.isActive()) {
            discard(null);
            ctx.flush();
            return;
        }
		
		// 如果待写队列为空,直接调用 flush(),然后返回。
        if (queueIsEmpty()) {
            ctx.flush();
            return;
        }
		
		// 标记是否最终需要调用 flush()。
        boolean requiresFlush = true;
        // 获取缓冲区分配器,用于分配内存。
        BufferAllocator allocator = ctx.bufferAllocator();
        // 只要通道可写(写缓冲区未满),就尝试写数据。防止写入过快导致拥塞。
        while (channel.isWritable()) {
        	// 从队列头获取当前待写项(不移除)
            final PendingWrite currentWrite = queue.peek();
            if (currentWrite == null) {
                break;
            }

			// 如果当前待写的 promise 已经完成(可能之前写失败了),直接移除该项并继续处理下一个。
            if (currentWrite.promise.isDone()) {
                queue.remove();
                continue;
            }

            final Object pendingMessage = currentWrite.msg;
            // 判断当前待写消息是否是 ChunkedInput 类型(分块输入流)。
            if (pendingMessage instanceof ChunkedInput) {
                final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
                boolean endOfInput;
                boolean suspend;
                Object message = null;
                try {
                	// 从 ChunkedInput 中读取下一块数据,分配内存。
                    message = chunks.readChunk(allocator);
                    // 判断是否读取到输入末尾。
                    endOfInput = chunks.isEndOfInput();
                    // suspend 标记如果当前块是 null 且未到末尾,说明暂时没有数据可写,需要挂起等待更多数据。
                    suspend = message == null && !endOfInput;

                } catch (final Throwable t) {
                	// 如果读取过程异常,清理资源,调用失败回调,并跳出循环。
                    queue.remove();

                    if (message != null) {
                        Resource.dispose(message);
                    }

                    closeInput(chunks);
                    currentWrite.fail(t);
                    break;
                }

				// 如果需要挂起等待数据,则退出写入循环。
                if (suspend) {
                    break;
                }

				// 如果块数据为空(null),则分配一个空缓冲区写入,防止写空消息时出现问题。
                if (message == null) {
                    message = allocator.allocate(0);
                }

				// 如果已到输入末尾,移除队列当前项。
                if (endOfInput) {
                    queue.remove();
                }
                // 写入当前块并立即刷新。
                Future<Void> f = ctx.writeAndFlush(message);
	
				// 如果已经到末尾,写完成后调用 handleEndOfInputFuture 处理关闭输入流、通知完成等。
                if (endOfInput) {
                    if (f.isDone()) {
                        handleEndOfInputFuture(f, chunks, currentWrite);
                    } else {
                        f.addListener(future -> handleEndOfInputFuture(future, chunks, currentWrite));
                    }
                } else {
                // 如果未到末尾,调用 handleFuture 处理写入完成后的逻辑,比如继续写或暂停写。
                    final boolean resume = !channel.isWritable();
                    if (f.isDone()) {
                        handleFuture(channel, f, chunks, currentWrite, resume);
                    } else {
                        f.addListener(future -> handleFuture(channel, future, chunks, currentWrite, resume));
                    }
                }
                // 由于已经调用了 writeAndFlush,此时不需要额外再调用 flush()。
                requiresFlush = false;
            } else {
            // 非 ChunkedInput 处理
            	// 对于非分块输入,直接从队列移除,调用 write() 异步写入,并将结果与当前 promise 关联。
            	// 标记需要最后调用 flush()。
                queue.remove();
                ctx.write(pendingMessage).cascadeTo(currentWrite.promise);
                requiresFlush = true;
            }

			// 每写完一个任务后检测通道状态,若关闭,则调用 discard 清理剩余队列,退出循环。
            if (!channel.isActive()) {
                discard(new ClosedChannelException());
                break;
            }
        }
		// 如果循环中没主动调用 flush(),则最后统一调用。
        if (requiresFlush) {
            ctx.flush();
        }
    }

	// 在最后一个 chunk 写完后调用: 处理 ChunkedInput 写入完成后的清理逻辑
    private static void handleEndOfInputFuture(Future<?> future, ChunkedInput<?> input, PendingWrite currentWrite) {
        closeInput(input);
        if (future.isFailed()) {
            currentWrite.fail(future.cause());
        } else {
            currentWrite.success();
        }
    }

	// 处理 非末尾 chunk 写入完成后的回调逻辑, 根据是否写入成功、是否需要继续写,来决定是否恢复 chunk 的发送
    private void handleFuture(Channel channel, Future<?> future, ChunkedInput<?> input,
                              PendingWrite currentWrite, boolean resume) {
        if (future.isFailed()) {
            closeInput(input);
            currentWrite.fail(future.cause());
        } else {
            if (resume && channel.isWritable()) {
                resumeTransfer();
            }
        }
    }

	// 关闭分块输入流,捕获并记录异常。
    private static void closeInput(ChunkedInput<?> chunks) {
        try {
            chunks.close();
        } catch (Throwable t) {
            logger.warn("Failed to close a ChunkedInput.", t);
        }
    }

}

ChunkedWriteHandler.PendingWrite

PendingWrite 是一个写任务的封装器,绑定了待写入数据和写完成通知,是实现异步分块写入的基础数据结构。

private static final class PendingWrite {
    final Object msg;
    final Promise<Void> promise;

    PendingWrite(Object msg, Promise<Void> promise) {
        this.msg = msg;
        this.promise = promise;
    }

    void fail(Throwable cause) {
        promise.tryFailure(cause);
        if (Resource.isAccessible(msg, false)) {
            SilentDispose.dispose(msg, logger);
        }
    }

    void success() {
        if (promise.isDone()) {
            return;
        }
        promise.trySuccess(null);
    }
}
Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐