找回密码
 立即注册
首页 业界区 业界 Netty源码—8.编解码原理

Netty源码—8.编解码原理

黎娅茜 4 天前
大纲
1.读数据入口
2.拆包原理
3.ByteToMessageDecoder解码步骤
4.解码器抽象的解码过程总结
5.Netty里常见的开箱即用的解码器
6.writeAndFlush()方法的大体步骤
7.MessageToByteEncoder的编码步骤
8.unsafe.write()写队列
9.unsafe.flush()刷新写队列
10.如何把对象变成字节流写到unsafe底层
 
1.读数据入口
当客户端Channel的Reactor线程NioEventLoop检测到有读事件时,会执行NioByteUnsafe的read()方法。该方法会调用doReadBytes()方法将TCP缓冲区的数据读到由ByteBufAllocator分配的一个ByteBuf对象中,然后通过pipeline.fireChannelRead()方法带上这个ByteBuf对象向下传播ChannelRead事件。
 
在传播的过程中,首先会来到pipeline的head结点的channelRead()方法。该方法会继续带着那个ByteBuf对象向下传播ChannelRead事件,比如会来到ByteToMessageDecoder结点的channelRead()方法。
 
注意:服务端Channel的unsafe变量是一个NioMessageUnsafe对象,客户端Channel的unsafe变量是一个NioByteUnsafe对象。
  1. //SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
  2. public final class NioEventLoop extends SingleThreadEventLoop {
  3.     Selector selector;
  4.     private SelectedSelectionKeySet selectedKeys;
  5.     private boolean needsToSelectAgain;
  6.     private int cancelledKeys;
  7.     ...
  8.     @Override
  9.     protected void run() {
  10.         for (;;) {
  11.             ...
  12.             //1.调用select()方法执行一次事件轮询
  13.             select(wakenUp.getAndSet(false));
  14.             if (wakenUp.get()) {
  15.                 selector.wakeup();
  16.             }
  17.             ...
  18.             //2.处理产生IO事件的Channel
  19.             needsToSelectAgain = false;
  20.             processSelectedKeys();
  21.             ...
  22.             //3.执行外部线程放入TaskQueue的任务
  23.             runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
  24.         }
  25.     }
  26.     
  27.     private void processSelectedKeys() {
  28.         if (selectedKeys != null) {
  29.             //selectedKeys.flip()会返回一个数组
  30.             processSelectedKeysOptimized(selectedKeys.flip());
  31.         } else {
  32.             processSelectedKeysPlain(selector.selectedKeys());
  33.         }
  34.     }
  35.     
  36.     private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
  37.         for (int i = 0;; i ++) {
  38.             //1.首先取出IO事件
  39.             final SelectionKey k = selectedKeys[i];
  40.             if (k == null) {
  41.                 break;
  42.             }
  43.             selectedKeys[i] = null;//Help GC
  44.             //2.然后获取对应的Channel和处理该Channel
  45.             //默认情况下,这个a就是NioChannel,也就是服务端启动时经过Netty封装的Channel
  46.             final Object a = k.attachment();
  47.             if (a instanceof AbstractNioChannel) {
  48.                 //网络事件的处理
  49.                 processSelectedKey(k, (AbstractNioChannel) a);
  50.             } else {
  51.                 //NioTask主要用于当一个SelectableChannel注册到Selector时,执行的一些任务
  52.                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
  53.                 processSelectedKey(k, task);
  54.             }
  55.             //3.最后判断是否应该再进行一次轮询
  56.             if (needsToSelectAgain) {
  57.                 for (;;) {
  58.                     i++;
  59.                     if (selectedKeys[i] == null) {
  60.                         break;
  61.                     }
  62.                     selectedKeys[i] = null;
  63.                 }
  64.                 selectAgain();
  65.                 //selectedKeys.flip()会返回一个数组
  66.                 selectedKeys = this.selectedKeys.flip();
  67.                 i = -1;
  68.             }
  69.         }
  70.     }
  71.     
  72.     private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
  73.         final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
  74.         ...
  75.         try {
  76.             int readyOps = k.readyOps();
  77.             ...
  78.             //新连接已准备接入或者已经存在的连接有数据可读
  79.             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
  80.                 //如果是新连接已准备接入,则调用NioMessageUnsafe的read()方法
  81.                 //如果是已经存在的连接有数据可读,执行的是NioByteUnsafe的read()方法
  82.                 unsafe.read();
  83.                 if (!ch.isOpen()) {
  84.                     return;
  85.                 }
  86.             }
  87.         } catch (CancelledKeyException ignored) {
  88.             unsafe.close(unsafe.voidPromise());
  89.         }
  90.     }
  91.     ...
  92. }
  93. public abstract class AbstractNioByteChannel extends AbstractNioChannel {
  94.     ...
  95.     protected class NioByteUnsafe extends AbstractNioUnsafe {
  96.         ...
  97.         @Override
  98.         public final void read() {
  99.             final ChannelConfig config = config();
  100.             final ChannelPipeline pipeline = pipeline();
  101.             //创建ByteBuf分配器
  102.             final ByteBufAllocator allocator = config.getAllocator();
  103.             final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
  104.             allocHandle.reset(config);
  105.             ByteBuf byteBuf = null;
  106.             do {
  107.                 //1.分配一个ByteBuf
  108.                 byteBuf = allocHandle.allocate(allocator);
  109.                 //2.将数据读取到分配的ByteBuf中
  110.                 allocHandle.lastBytesRead(doReadBytes(byteBuf));
  111.                 if (allocHandle.lastBytesRead() <= 0) {
  112.                     byteBuf.release();
  113.                     byteBuf = null;
  114.                     close = allocHandle.lastBytesRead() < 0;
  115.                     break;
  116.                 }
  117.                 ...
  118.                 //3.调用DefaultChannelPipeline的fireChannelRead()方法从Head结点开始传播事件
  119.                 pipeline.fireChannelRead(byteBuf);
  120.                 byteBuf = null;
  121.             } while (allocHandle.continueReading());
  122.             allocHandle.readComplete();
  123.             //4.调用DefaultChannelPipeline的fireChannelReadComplete()方法从Head结点开始传播事件
  124.             pipeline.fireChannelReadComplete();
  125.             ...
  126.         }
  127.     }
  128. }
复制代码
(3)基于分隔符解码器
可以向基于分隔符解码器DelimiterBasedFrameDecoder传递一个分隔符列表,这样该解码器就会按照分隔符列表对数据包进行拆分。基于分隔符解码器的decode()方法和基于行分隔符解码器的decode()方法基本类似。
[code]//A decoder that splits the received ByteBufs by one or more delimiters.  public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder {    private final ByteBuf[] delimiters;    private final int maxFrameLength;    private final boolean stripDelimiter;    private final boolean failFast;    private boolean discardingTooLongFrame;    private int tooLongFrameLength;    private final LineBasedFrameDecoder lineBasedDecoder;    ...    //Creates a new instance.    //@param maxFrameLength,the maximum length of the decoded frame.    //A TooLongFrameException is thrown if the length of the frame exceeds this value.    //@param stripDelimiter,whether the decoded frame should strip out the delimiter or not    //@param failFast,If true, a TooLongFrameException is thrown as soon as the decoder     //notices the length of the frame will exceed maxFrameLength regardless of     //whether the entire frame has been read.    //If false, a TooLongFrameException is thrown after the entire frame that exceeds maxFrameLength has been read.    //@param delimiters  the delimiters    public DelimiterBasedFrameDecoder(int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) {        validateMaxFrameLength(maxFrameLength);        if (delimiters == null) {            throw new NullPointerException("delimiters");        }        if (delimiters.length == 0) {            throw new IllegalArgumentException("empty delimiters");        }        if (isLineBased(delimiters) && !isSubclass()) {            lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);            this.delimiters = null;        } else {            this.delimiters = new ByteBuf[delimiters.length];            for (int i = 0; i < delimiters.length; i ++) {                ByteBuf d = delimiters;                validateDelimiter(d);                this.delimiters = d.slice(d.readerIndex(), d.readableBytes());            }            lineBasedDecoder = null;        }        this.maxFrameLength = maxFrameLength;        this.stripDelimiter = stripDelimiter;        this.failFast = failFast;    }        //Returns true if the delimiters are "\n" and "\r\n".    private static boolean isLineBased(final ByteBuf[] delimiters) {        if (delimiters.length != 2) {            return false;        }        ByteBuf a = delimiters[0];        ByteBuf b = delimiters[1];        if (a.capacity() < b.capacity()) {            a = delimiters[1];            b = delimiters[0];        }        return a.capacity() == 2 && b.capacity() == 1            && a.getByte(0) == '\r' && a.getByte(1) == '\n'            && b.getByte(0) == '\n';    }    //Return true if the current instance is a subclass of DelimiterBasedFrameDecoder    private boolean isSubclass() {        return getClass() != DelimiterBasedFrameDecoder.class;    }    @Override    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {        Object decoded = decode(ctx, in);        if (decoded != null) {            out.add(decoded);        }    }    //Create a frame out of the {@link ByteBuf} and return it.    //@param   ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to    //@param   buffer,the ByteBuf from which to read data    //@return  frame,the ByteBuf which represent the frame or null if no frame could be created.    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {        if (lineBasedDecoder != null) {            return lineBasedDecoder.decode(ctx, buffer);        }        //Try all delimiters and choose the delimiter which yields the shortest frame.        int minFrameLength = Integer.MAX_VALUE;        ByteBuf minDelim = null;        for (ByteBuf delim: delimiters) {            int frameLength = indexOf(buffer, delim);            if (frameLength >= 0 && frameLength < minFrameLength) {                minFrameLength = frameLength;                minDelim = delim;            }        }        if (minDelim != null) {            int minDelimLength = minDelim.capacity();            ByteBuf frame;            if (discardingTooLongFrame) {                //We've just finished discarding a very large frame.                //Go back to the initial state.                discardingTooLongFrame = false;                buffer.skipBytes(minFrameLength + minDelimLength);                int tooLongFrameLength = this.tooLongFrameLength;                this.tooLongFrameLength = 0;                if (!failFast) {                    fail(tooLongFrameLength);                }                return null;            }            if (minFrameLength > maxFrameLength) {                //Discard read frame.                buffer.skipBytes(minFrameLength + minDelimLength);                fail(minFrameLength);                return null;            }            if (stripDelimiter) {                frame = buffer.readRetainedSlice(minFrameLength);                buffer.skipBytes(minDelimLength);            } else {                frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);            }            return frame;        } else {            if (!discardingTooLongFrame) {                if (buffer.readableBytes() > maxFrameLength) {                    //Discard the content of the buffer until a delimiter is found.                    tooLongFrameLength = buffer.readableBytes();                    buffer.skipBytes(buffer.readableBytes());                    discardingTooLongFrame = true;                    if (failFast) {                        fail(tooLongFrameLength);                    }                }            } else {                //Still discarding the buffer since a delimiter is not found.                tooLongFrameLength += buffer.readableBytes();                buffer.skipBytes(buffer.readableBytes());            }            return null;        }    }        private void fail(long frameLength) {        if (frameLength > 0) {            throw new TooLongFrameException("frame length exceeds " + maxFrameLength + ": " + frameLength + " - discarded");        } else {            throw new TooLongFrameException("frame length exceeds " + maxFrameLength + " - discarding");        }    }    //Returns the number of bytes between the readerIndex of the haystack and the first needle found in the haystack.      //-1 is returned if no needle is found in the haystack.    private static int indexOf(ByteBuf haystack, ByteBuf needle) {        for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i ++) {            int haystackIndex = i;            int needleIndex;            for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex ++) {                if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) {                    break;                } else {                    haystackIndex ++;                    if (haystackIndex == haystack.writerIndex() && needleIndex != needle.capacity() - 1) {                        return -1;                    }                }            }            if (needleIndex == needle.capacity()) {                //Found the needle from the haystack!                return i - haystack.readerIndex();            }        }        return -1;    }    private static void validateDelimiter(ByteBuf delimiter) {        if (delimiter == null) {            throw new NullPointerException("delimiter");        }        if (!delimiter.isReadable()) {            throw new IllegalArgumentException("empty delimiter");        }    }    private static void validateMaxFrameLength(int maxFrameLength) {        if (maxFrameLength
您需要登录后才可以回帖 登录 | 立即注册