从 LengthFieldBasedFrameDecoder 看 netty 处理拆包

LengthFieldBasedFrameDecoder 继承自 ByteToMessageDecoder

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter

ByteToMessageDecoder 本身是一个 ChannelInboundHandler

ByteToMessageDecoder 中有 2 种数据积累器,一种拷贝式,一种组合式,默认使用拷贝式,组合式更省内存,更复杂,会慢点。

// io.netty.handler.codec.ByteToMessageDecoder#cumulator
private Cumulator cumulator = MERGE_CUMULATOR;
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
    @Override
    public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        try {
            final int required = in.readableBytes();
            // 需要扩容
            if (required > cumulation.maxWritableBytes() ||
                    (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
                    cumulation.isReadOnly()) {
                // Expand cumulation (by replacing it) under the following conditions:
                // - cumulation cannot be resized to accommodate the additional data
                // - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
                //   assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
                return expandCumulation(alloc, cumulation, in);
            }
            // 把新读到的数据写入积累器
            return cumulation.writeBytes(in);
        } finally {
            // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
            // for whatever release (for example because of OutOfMemoryError)
            in.release();
        }
    }
};

入口在

// io.netty.handler.codec.ByteToMessageDecoder#channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        // 从对象池获取 CodecOutputList
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            // 从 socket 读到的数据
            ByteBuf data = (ByteBuf) msg;
            // 是否第一次接收数据
            first = cumulation == null;
            if (first) {
                cumulation = data;
            } else {
                // 数据积累器是拷贝式
                // 如果上一次由于拆包,没有解析出消息,则数据会存留在积累器中
                // 积累器接收新的数据
                cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
            }
            // 对积累器的数据进行解码
            callDecode(ctx, cumulation, out);
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            if (cumulation != null && !cumulation.isReadable()) {
                numReads = 0;
                cumulation.release();
                cumulation = null;
            } else if (++ numReads >= discardAfterReads) {
                // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                // See https://github.com/netty/netty/issues/4275
                numReads = 0;
                discardSomeReadBytes();
            }

            int size = out.size();
            firedChannelRead |= out.insertSinceRecycled();
            fireChannelRead(ctx, out, size);
            out.recycle();
        }
    } else {
        ctx.fireChannelRead(msg);
    }
}

解码

// io.netty.handler.codec.ByteToMessageDecoder#callDecode
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
        while (in.isReadable()) {
            // 已解码的消息个数
            int outSize = out.size();

            if (outSize > 0) {
                fireChannelRead(ctx, out, outSize);
                out.clear();

                // Check if this handler was removed before continuing with decoding.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See:
                // - https://github.com/netty/netty/issues/4635
                if (ctx.isRemoved()) {
                    break;
                }
                outSize = 0;
            }

            int oldInputLength = in.readableBytes();
            // 调用子类的 decode 方法
            decodeRemovalReentryProtection(ctx, in, out);

            // Check if this handler was removed before continuing the loop.
            // If it was removed, it is not safe to continue to operate on the buffer.
            //
            // See https://github.com/netty/netty/issues/1664
            if (ctx.isRemoved()) {
                break;
            }
            // 前后的消息个数相等,表示没有解码新的消息
            if (outSize == out.size()) {
                if (oldInputLength == in.readableBytes()) {
                    // 拆包情况,走这个分支,跳出循环,读取更多的数据
                    break;
                } else {
                    continue;
                }
            }

            if (oldInputLength == in.readableBytes()) {
                throw new DecoderException(
                        StringUtil.simpleClassName(getClass()) +
                                ".decode() did not read anything but decoded a message.");
            }

            if (isSingleDecode()) {
                break;
            }
        }
    } catch (DecoderException e) {
        throw e;
    } catch (Exception cause) {
        throw new DecoderException(cause);
    }
}
// io.netty.handler.codec.LengthFieldBasedFrameDecoder#decode
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    Object decoded = decode(ctx, in);
    if (decoded != null) {
        out.add(decoded);
    }
}


protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
    if (discardingTooLongFrame) {
        discardingTooLongFrame(in);
    }

    if (in.readableBytes() < lengthFieldEndOffset) {
        return null;
    }

    int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
    // 读取报文的长度字段,解析出报文长度
    // ByteBuf.getUnsignedByte 并不会移动 readerIndex
    long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);

    if (frameLength < 0) {
        failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
    }

    frameLength += lengthAdjustment + lengthFieldEndOffset;

    if (frameLength < lengthFieldEndOffset) {
        failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
    }

    if (frameLength > maxFrameLength) {
        exceededFrameLength(in, frameLength);
        return null;
    }

    // never overflows because it's less than maxFrameLength
    int frameLengthInt = (int) frameLength;
    // 拆包情况,读到的数据小于报文的长度,无法解析,需要继续接收数据,进行第二次解析
    if (in.readableBytes() < frameLengthInt) {
        return null;
    }

    if (initialBytesToStrip > frameLengthInt) {
        failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
    }
    in.skipBytes(initialBytesToStrip);

    // extract frame
    int readerIndex = in.readerIndex();
    int actualFrameLength = frameLengthInt - initialBytesToStrip;
    ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
    in.readerIndex(readerIndex + actualFrameLength);
    return frame;
}

处理拆包的流程:
1. 从 channel 读取的数据会先放到数据积累器中
2. 使用解码器对数据进行解码,拆包情况则是接收的数据小于报文的实际长度,因此解码失败
3. 重新从 channel 读取消息,放入积累器中
4. 再次解码消息,若成功则传播到 pipeline 中,失败则继续接收消息

上一篇:Netty内存池化(一)总览


下一篇:Netty的ByteBuf API