[netty5: ChunkedInput & ChunkedWriteHandler]-源码分析
是 Netty 中用于按块读取不定长数据流的接口,常配合实现流式写入,支持如文件、流、HTTP 和 WebSocket 等多种数据源。
·
ChunkedInput
ChunkedInput<B> 是 Netty 中用于按块读取不定长数据流的接口,常配合 ChunkedWriteHandler 实现流式写入,支持如文件、流、HTTP 和 WebSocket 等多种数据源。
| 实现类 | 简要说明 |
|---|---|
ChunkedFile |
用于将常规文件按块传输(使用传统阻塞 IO)。 |
ChunkedNioFile |
用于将 FileChannel 形式的文件通过 NIO 按块传输。 |
ChunkedNioStream |
将 ReadableByteChannel 数据源作为块状输入,适用于 NIO 输入流。 |
ChunkedStream |
将 InputStream(阻塞流)按块读取并传输。 |
Http2DataChunkedInput |
专为 HTTP/2 数据帧的按块输入设计,用于发送 DATA 帧。 |
HttpChunkedInput |
将 HttpContent 对象(如文件块)封装为支持 trailer 的块状 HTTP 输出。 |
WebSocketChunkedInput |
用于 WebSocket 数据帧分块传输,支持大帧拆分成多个 WebSocket 帧。 |
public interface ChunkedInput<B> extends AutoCloseable {
// 是否已经读取到输入流末尾
boolean isEndOfInput() throws Exception;
// 从数据流中读取下一段数据块(chunk)
B readChunk(BufferAllocator allocator) throws Exception;
// 返回整个输入源的长度(如果已知)
long length();
// 返回目前已经“传输”的字节数
long progress();
}
ChunkedWriteHandler
ChunkedWriteHandler 是 Netty 中用于分块写入大数据流(如文件、视频流等)的处理器,核心职责是将大数据拆成小块逐步异步写入,避免一次性占用大量内存,提高传输效率和系统稳定性。主要特点和功能包括:
- 分块写入:支持 ChunkedInput 类型的数据流,按块读取并写入,适合无法一次性全部加载到内存的大数据。
- 异步处理:内部维护一个待写队列(PendingWrite),通过事件驱动机制逐块写出数据,保证非阻塞的高效传输。
- 资源管理:在写入完成或异常关闭时,会自动关闭数据流,释放资源,防止内存泄漏。
- 错误处理:遇到写入失败或通道关闭时,能正确通知每个待写任务失败,并清理队列。
- 流控制:自动管理写请求,避免写入过快导致拥塞,通过事件循环调度写操作。
public class ChunkedWriteHandler implements ChannelHandler {
private static final Logger logger = LoggerFactory.getLogger(ChunkedWriteHandler.class);
private Queue<PendingWrite> queue;
private volatile ChannelHandlerContext ctx;
public ChunkedWriteHandler() {}
private void allocateQueue() {
if (queue == null) {
queue = new ArrayDeque<>();
}
}
private boolean queueIsEmpty() {
return queue == null || queue.isEmpty();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
}
public void resumeTransfer() {
final ChannelHandlerContext ctx = this.ctx;
if (ctx == null) {
return;
}
if (ctx.executor().inEventLoop()) {
resumeTransfer0(ctx);
} else {
ctx.executor().execute(() -> resumeTransfer0(ctx));
}
}
private void resumeTransfer0(ChannelHandlerContext ctx) {
try {
doFlush(ctx);
} catch (Exception e) {
logger.warn("Unexpected exception while sending chunks.", e);
}
}
// 如果当前有待写队列(queue)不为空,或者写入的消息是 ChunkedInput(分块数据流),则将写操作封装为 PendingWrite 并加入队列,返回对应的 Future(异步写结果)。
// 否则直接调用下游的 ctx.write(msg)。
@Override
public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
if (!queueIsEmpty() || msg instanceof ChunkedInput) {
allocateQueue();
Promise<Void> promise = ctx.newPromise();
queue.add(new PendingWrite(msg, promise));
return promise.asFuture();
} else {
return ctx.write(msg);
}
}
@Override
public void flush(ChannelHandlerContext ctx) {
doFlush(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
doFlush(ctx);
ctx.fireChannelInactive();
}
// 实现基于通道写缓冲区状态的流控,防止写过快导致内存溢出
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isWritable()) {
doFlush(ctx);
}
ctx.fireChannelWritabilityChanged();
}
// 从 queue 中依次取出 PendingWrite,若是 ChunkedInput 则判断是否写完,关闭资源,并根据情况调用 fail() 或 success()。
private void discard(Throwable cause) {
if (queueIsEmpty()) {
return;
}
for (;;) {
PendingWrite currentWrite = queue.poll();
if (currentWrite == null) {
break;
}
Object message = currentWrite.msg;
if (message instanceof ChunkedInput) {
ChunkedInput<?> in = (ChunkedInput<?>) message;
boolean endOfInput;
try {
endOfInput = in.isEndOfInput();
closeInput(in);
} catch (Exception e) {
closeInput(in);
currentWrite.fail(e);
logger.warn("ChunkedInput failed", e);
continue;
}
if (!endOfInput) {
if (cause == null) {
cause = new ClosedChannelException();
}
currentWrite.fail(cause);
} else {
currentWrite.success();
}
} else {
if (cause == null) {
cause = new ClosedChannelException();
}
currentWrite.fail(cause);
}
}
}
private void doFlush(final ChannelHandlerContext ctx) {
final Channel channel = ctx.channel();
// 如果通道不活跃(比如已关闭),调用 discard(null) 清空队列,释放资源,
// 随后调用 ctx.flush() 以确保之前写过但未刷新的数据也被处理,最后直接返回。
if (!channel.isActive()) {
discard(null);
ctx.flush();
return;
}
// 如果待写队列为空,直接调用 flush(),然后返回。
if (queueIsEmpty()) {
ctx.flush();
return;
}
// 标记是否最终需要调用 flush()。
boolean requiresFlush = true;
// 获取缓冲区分配器,用于分配内存。
BufferAllocator allocator = ctx.bufferAllocator();
// 只要通道可写(写缓冲区未满),就尝试写数据。防止写入过快导致拥塞。
while (channel.isWritable()) {
// 从队列头获取当前待写项(不移除)
final PendingWrite currentWrite = queue.peek();
if (currentWrite == null) {
break;
}
// 如果当前待写的 promise 已经完成(可能之前写失败了),直接移除该项并继续处理下一个。
if (currentWrite.promise.isDone()) {
queue.remove();
continue;
}
final Object pendingMessage = currentWrite.msg;
// 判断当前待写消息是否是 ChunkedInput 类型(分块输入流)。
if (pendingMessage instanceof ChunkedInput) {
final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
boolean endOfInput;
boolean suspend;
Object message = null;
try {
// 从 ChunkedInput 中读取下一块数据,分配内存。
message = chunks.readChunk(allocator);
// 判断是否读取到输入末尾。
endOfInput = chunks.isEndOfInput();
// suspend 标记如果当前块是 null 且未到末尾,说明暂时没有数据可写,需要挂起等待更多数据。
suspend = message == null && !endOfInput;
} catch (final Throwable t) {
// 如果读取过程异常,清理资源,调用失败回调,并跳出循环。
queue.remove();
if (message != null) {
Resource.dispose(message);
}
closeInput(chunks);
currentWrite.fail(t);
break;
}
// 如果需要挂起等待数据,则退出写入循环。
if (suspend) {
break;
}
// 如果块数据为空(null),则分配一个空缓冲区写入,防止写空消息时出现问题。
if (message == null) {
message = allocator.allocate(0);
}
// 如果已到输入末尾,移除队列当前项。
if (endOfInput) {
queue.remove();
}
// 写入当前块并立即刷新。
Future<Void> f = ctx.writeAndFlush(message);
// 如果已经到末尾,写完成后调用 handleEndOfInputFuture 处理关闭输入流、通知完成等。
if (endOfInput) {
if (f.isDone()) {
handleEndOfInputFuture(f, chunks, currentWrite);
} else {
f.addListener(future -> handleEndOfInputFuture(future, chunks, currentWrite));
}
} else {
// 如果未到末尾,调用 handleFuture 处理写入完成后的逻辑,比如继续写或暂停写。
final boolean resume = !channel.isWritable();
if (f.isDone()) {
handleFuture(channel, f, chunks, currentWrite, resume);
} else {
f.addListener(future -> handleFuture(channel, future, chunks, currentWrite, resume));
}
}
// 由于已经调用了 writeAndFlush,此时不需要额外再调用 flush()。
requiresFlush = false;
} else {
// 非 ChunkedInput 处理
// 对于非分块输入,直接从队列移除,调用 write() 异步写入,并将结果与当前 promise 关联。
// 标记需要最后调用 flush()。
queue.remove();
ctx.write(pendingMessage).cascadeTo(currentWrite.promise);
requiresFlush = true;
}
// 每写完一个任务后检测通道状态,若关闭,则调用 discard 清理剩余队列,退出循环。
if (!channel.isActive()) {
discard(new ClosedChannelException());
break;
}
}
// 如果循环中没主动调用 flush(),则最后统一调用。
if (requiresFlush) {
ctx.flush();
}
}
// 在最后一个 chunk 写完后调用: 处理 ChunkedInput 写入完成后的清理逻辑
private static void handleEndOfInputFuture(Future<?> future, ChunkedInput<?> input, PendingWrite currentWrite) {
closeInput(input);
if (future.isFailed()) {
currentWrite.fail(future.cause());
} else {
currentWrite.success();
}
}
// 处理 非末尾 chunk 写入完成后的回调逻辑, 根据是否写入成功、是否需要继续写,来决定是否恢复 chunk 的发送
private void handleFuture(Channel channel, Future<?> future, ChunkedInput<?> input,
PendingWrite currentWrite, boolean resume) {
if (future.isFailed()) {
closeInput(input);
currentWrite.fail(future.cause());
} else {
if (resume && channel.isWritable()) {
resumeTransfer();
}
}
}
// 关闭分块输入流,捕获并记录异常。
private static void closeInput(ChunkedInput<?> chunks) {
try {
chunks.close();
} catch (Throwable t) {
logger.warn("Failed to close a ChunkedInput.", t);
}
}
}
ChunkedWriteHandler.PendingWrite
PendingWrite 是一个写任务的封装器,绑定了待写入数据和写完成通知,是实现异步分块写入的基础数据结构。
private static final class PendingWrite {
final Object msg;
final Promise<Void> promise;
PendingWrite(Object msg, Promise<Void> promise) {
this.msg = msg;
this.promise = promise;
}
void fail(Throwable cause) {
promise.tryFailure(cause);
if (Resource.isAccessible(msg, false)) {
SilentDispose.dispose(msg, logger);
}
}
void success() {
if (promise.isDone()) {
return;
}
promise.trySuccess(null);
}
}
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)