找回密码
 立即注册
首页 业界区 业界 Netty源码—6.ByteBuf原理一

Netty源码—6.ByteBuf原理一

国语诗 4 天前
大纲
1.关于ByteBuf的问题整理
2.ByteBuf结构以及重要API
3.ByteBuf的分类
4.ByteBuf分类的补充说明
5.ByteBuf的主要内容分三大方面
6.内存分配器ByteBufAllocator
7.ByteBufAllocator的两大子类
8.PoolArena分配内存的流程
 
1.关于ByteBuf的问题整理
问题一:Netty的内存类别有哪些?
答:ByteBuf可以按三个维度来进行分类:一个是堆内和堆外,一个是Unsafe和非Unsafe,一个是Pooled和非Pooled。
 
堆内是基于byte字节数组内存进行分配,堆外是基于JDK的DirectByteBuffer内存进行分配。
 
Unsafe是通过JDK的一个unsafe对象基于物理内存地址进行数据读写,非Unsafe是直接调用JDK的API进行数据读写。
 
Pooled是预先分配的一整块内存,分配时直接用一定的算法从这整块内存里取出一块连续内存。UnPooled是每次分配内存都直接申请内存。
 
问题二:如何减少多线程内存分配之间的竞争?如何确保多线程对于同一内存分配不产生冲突?
答:一个内存分配器里维护着一个PoolArena数组,所有的内存分配都在PoolArena上进行。通过一个PoolThreadCache对象将线程和PoolArena进行一一绑定(利用ThreadLocal原理)。默认一个线程对应一个PoolArena,这样就能做到多线程内存分配相互不受影响。
 
问题三:不同大小的内存是如何进行分配的?
答:对于Page级别的内存分配与释放是直接通过完全二叉树的标记来寻找某一个连续内存的。对于Page级别以下的内存分配与释放,首先是找到一个Page,然后把此Page按照SubPage大小进行划分,最后通过位图的方式来进行内存分配与释放。
 
不管是Page级别的内存还是SubPage级别的内存,当内存被释放掉时有可能会被加入到不同级别的一个缓存队列供下一次分配使用。
 
2.ByteBuf结构以及重要API
(1)ByteBuf的结构
(2)read、write和set方法
(3)mark和reset方法
(4)retain和release方法
(5)slice、duplicate和copy方法
 
(1)ByteBuf的结构
1.jpg
ByteBuf是一个字节容器,分三部分:
第一部分是已经丢弃的字节,这部分数据是无效的;
第二部分是可读字节,这部分数据是ByteBuf的主体数据,从ByteBuf里读取的数据都是来自这部分;
第三部分是可写字节,所有写到ByteBuf的数据都会写到这一段;
 
从ByteBuf中每读取一字节,读指针readerIndex + 1。从ByteBuf中每写入一字节,写指针writerIndex + 1。ByteBuf里的可读字节数是:writerIndex - readerIndex。当writerIndex = readerIndex时,表示ByteBuf不可读。
 
capacity表示ByteBuf底层内存总容量,当writerIndex = capacity时,表示ByteBuf不可写。
 
当向ByteBuf写数据时,如果容量不足,可以进行扩容,直到capacity扩容到maxCapacity。
 
注意:ByteBuf的三个核心变量是readerIndex、writerIndex、capacity,ByteBuffer的三个核心变量是position、limit、capacity。
 
(2)read、write和set方法
readableBytes()表示ByteBuf当前可读的字节数,它的值等于writerIndex - readerIndex。readBytes(byte[] dst)表示把ByteBuf里的数据全部读取到dst,这里dst字节数组的大小通常等于readableBytes()。
 
writableBytes()表示ByteBuf当前可写的字节数,它的值等于capacity - writerIndex。writeBytes(byte[] src)表示把字节数组src里的全部数据写到ByteBuf,这里src字节数组的大小通常小于等于writableBytes()。
 
(3)mark和reset方法
markReaderIndex()表示将当前的readerIndex备份到markedReaderIndex中,markWriterIndex()表示将当前的writerIndex备份到markedWriterIndex中,resetReaderIndex()表示将当前的readerIndex设置为markedReaderIndex,resetWriterIndex()表示将当前的writerIndex设置为markedWriterIndex。
 
(4)retain和release方法
由于Netty使用了堆外内存,而堆外内存是不被JVM直接管理的。也就是说,申请到的堆外内存无法被垃圾回收器自动回收,所以需要手动回收。
 
Netty的ByteBuf是通过引用计数的方式管理的,如果一个ByteBuf没有地方被引用到,则需要回收底层内存。
 
在默认情况下,当创建完一个ByteBuf时,它的引用计数为1。然后每次调用retain()方法,它的引用计数就会加1。接着每次调用release()方法,它的引用计数就会减1。release()方法减完之后如果发现引用计数为0,则直接回收ByteBuf底层的内存。
 
所以在一个函数体内,只要增加了引用计数(包括ByteBuf的创建和手动调用retain()方法),就必须调用release()方法以免内存泄露。
 
(5)slice、duplicate和copy方法
这三个方法的返回值分别是一个新的ByteBuf对象。
 
slice()方法会从原始ByteBuf中截取一段,这段数据是从readIndex到writerIndex的,同时返回的新的ByteBuf的最大容量maxCapacity为原始ByteBuf的readableBytes()。往slice()方法返回的ByteBuf中写数据会影响原始ByteBuf。
 
duplicate()方法会把原始ByteBuf全都截取出来,包括所有的数据和指针信息。往duplicate()方法返回的ByteBuf中写数据会影响原始ByteBuf。
 
copy()方法会从原始ByteBuf中复制所有的信息,包括读写指针和底层对应的数据。往copy()方法返回的ByteBuf中写数据不会影响原始ByteBuf。
 
slice()方法与duplicate()方法的相同点是:底层内存及引用计数与原始ByteBuf共享。slice()方法或者duplicate()方法返回的ByteBuf调用write系列方法都会影响到原始ByteBuf。
 
retainedSlice()方法等价于slice().retain(),retainedDuplicate()方法等价于duplicate().retain()。
  1. //1.多次释放的例子
  2. ByteBuf buffer = xxx;
  3. doWith(buffer);
  4. //重复释放
  5. buffer.release();
  6. public void doWith(ByteBuf buffer) {
  7.     //没有增加引用计数
  8.     ByteBuf slice = buffer.slice();
  9.     foo(slice);
  10. }
  11. public void foo(ByteBuf buffer) {
  12.     //一次释放
  13.     buffer.release();
  14. }
  15. //2.不释放造成内存泄露的例子
  16. ByteBuf buffer = xxx;
  17. doWith(buffer);
  18. //引用计数为2,调用release()方法后引用计数为1,无法释放内存
  19. buffer.release();
  20. public void doWith(ByteBuf buffer) {
  21.     //增加引用计数
  22.     ByteBuf slice = buffer.retainedSlice();
  23.     foo(slice);
  24. }
  25. public void foo(ByteBuf buffer) {
  26.     //没有调用release()方法
  27. }
复制代码
 
3.ByteBuf的分类
(1)ByteBuf的类结构
(2)Pooled和Unpooled
(3)Unsafe和非Unsafe
(4)Heap和Direct
 
(1)ByteBuf的类结构
AbstractByteBuf继承自ByteBuf,主要实现了一些基本骨架的方法,如ByteBuf的一些公共属性和功能,而具体的读字节和写字节操作会放到其子类通过下划线前缀的方法来实现。
2.jpg
(2)Pooled和Unpooled
Pooled和Unpooled就是池化和非池化的分类。Pooled的内存分配每一次都是从预先分配好的一块内存里去取一段连续内存来封装成一个ByteBuf对象给应用程序。Unpooled的内存分配每一次都是直接调用系统API去向操作系统申请一块内存。所以两者最大区别是:一个是从预先分配好的内存里分配,一个是直接去分配。
 
(3)Unsafe和非Unsafe
Unsafe和非Unsafe不需要我们操心,Netty会根据系统是否有unsafe对象自行选择。JDK里有个Unsafe对象,它可以直接拿到对象的内存地址,然后可以基于这个内存地址进行读写操作。Unsafe类型的ByteBuf可以拿到ByteBuf对象在JVM里的具体内存地址,然后直接通过JDK的Unsafe进行读写。非Unsafe类型的ByteBuf则是不会依赖到JDK里的Unsafe对象。
 
(4)Heap和Direct
Heap和Direct就是堆内和堆外的分类。Heap分配出来的内存会自动受到GC的管理,不需要手动释放。Direct分配出来的内存则不受JVM控制,不参与GC垃圾回收的过程,需要手动释放以免内存泄露。UnpooledHeapByteBuf底层会依赖于一个字节数组byte[]进行所有内存相关的操作,UnpooledDirectByteBuf底层则依赖于一个JDK堆外内存对象DirectByteBuffer进行所有内存相关的操作。
 
4.ByteBuf分类的补充说明
(1)堆内存HeapByteBuf
(2)直接内存DIrectByteBuf
(3)Pooled和Unpooled
(4)池化和非池化的HeapByteBuf
 
(1)堆内存HeapByteBuf
优点是内存的分配和回收速度快,可以被JVM自动回收。缺点是如果进行Socket的IO读写,需要额外做一次内存复制。也就是将堆内存对应的缓冲区复制到内核Channel中,性能下降。
 
(2)直接内存DIrectByteBuf
在堆外进行内存分配,相比于堆内存,它的分配和回收速度还会慢一些。但是如果进行Scoket的IO读写,由于少了一次内存复制,所以速度比堆内存快。
 
所以最佳实践应该是:IO通信线程的读写缓冲区使用DirectByteBuf,后端业务消息的编解码模块使用HeapByteBuf。
 
(3)Pooled和Unpooled
Pooled的ByteBuf可以重用ByteBuf对象,它自己维护了一个内存池,可以循环利用已创建的ByteBuf,从而提升了内存的使用效率,降低了由于高负载导致的频繁GC。
 
尽管推荐使用基于Pooled的ByteBuf,但是内存池的管理和维护更加复杂,使用起来需要更加谨慎。
 
(4)池化和非池化的HeapByteBuf
UnpooledHeapByteBuf是基于堆内存进行内存分配的字节缓冲区,每次IO读写都会创建一个新的UnpooledHeapByteBuf。
 
频繁进行大块内存的分配和回收会对性能造成影响,但相比于堆外内存的申请和释放,成本还是低些。
 
UnpooledHeapByteBuf的实现原理比PooledHeapByteBuf简单,不容易出现内存管理方面的问题,满足性能下推荐UnpooledHeapByteBuf。
 
5.ByteBuf的主要内容分三大方面
一.内存与内存分配器的抽象
二.不同规格大小和不同类别的内存的分配策略
三.内存的回收过程
 
6.内存分配器ByteBufAllocator
(1)ByteBufAllocator的功能
(2)AbstractByteBufAllocator
 
(1)ByteBufAllocator的核心功能
所有类型的ByteBuf最终都是通过Netty里的内存分配器分配出来的,Netty里的内存分配器都有一个最顶层的抽象ByteBufAllocator,用于负责分配所有类型的内存。
 
ByteBufAllocator的核心功能如下:
一.buffer()
分配一块内存或者说分配一个字节缓冲区,由子类具体实现决定是Heap还是Direct。
二.ioBuffer()
分配一块DirectByteBuffer的内存。
三.heapBuffer()
在堆上进行内存分配。
四.directBuffer()
在堆外进行内存分配。
  1. //Implementations are responsible to allocate buffers. Implementations of this interface are expected to be thread-safe.
  2. public interface ByteBufAllocator {
  3.     ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
  4.     //Allocate a ByteBuf. If it is a direct or heap buffer depends on the actual implementation.
  5.     ByteBuf buffer();
  6.     //Allocate a ByteBuf with the given initial capacity.
  7.     //If it is a direct or heap buffer depends on the actual implementation.
  8.     ByteBuf buffer(int initialCapacity);
  9.     //Allocate a ByteBuf} with the given initial capacity and the given maximal capacity.
  10.     //If it is a direct or heap buffer depends on the actual implementation.
  11.     ByteBuf buffer(int initialCapacity, int maxCapacity);
  12.     //Allocate a ByteBuf, preferably a direct buffer which is suitable for I/O.
  13.     ByteBuf ioBuffer();
  14.     //Allocate a ByteBuf, preferably a direct buffer which is suitable for I/O.
  15.     ByteBuf ioBuffer(int initialCapacity);
  16.     //Allocate a ByteBuf, preferably a direct buffer which is suitable for I/O.
  17.     ByteBuf ioBuffer(int initialCapacity, int maxCapacity);
  18.     //Allocate a heap ByteBuf.
  19.     ByteBuf heapBuffer();
  20.     //Allocate a heap ByteBuf with the given initial capacity.
  21.     ByteBuf heapBuffer(int initialCapacity);
  22.     //Allocate a heap ByteBuf with the given initial capacity and the given maximal capacity.
  23.     ByteBuf heapBuffer(int initialCapacity, int maxCapacity);
  24.     //Allocate a direct ByteBuf.
  25.     ByteBuf directBuffer();
  26.     //Allocate a direct ByteBuf with the given initial capacity.
  27.     ByteBuf directBuffer(int initialCapacity);
  28.     //Allocate a direct ByteBuf with the given initial capacity and the given maximal capacity.
  29.     ByteBuf directBuffer(int initialCapacity, int maxCapacity);
  30.     //Allocate a CompositeByteBuf.
  31.     //If it is a direct or heap buffer depends on the actual implementation.
  32.     CompositeByteBuf compositeBuffer();
  33.     //Allocate a CompositeByteBuf with the given maximum number of components that can be stored in it.
  34.     //If it is a direct or heap buffer depends on the actual implementation.
  35.     CompositeByteBuf compositeBuffer(int maxNumComponents);
  36.     //Allocate a heap CompositeByteBuf.
  37.     CompositeByteBuf compositeHeapBuffer();
  38.     //Allocate a heap CompositeByteBuf with the given maximum number of components that can be stored in it.
  39.     CompositeByteBuf compositeHeapBuffer(int maxNumComponents);
  40.     //Allocate a direct CompositeByteBuf.
  41.     CompositeByteBuf compositeDirectBuffer();
  42.     //Allocate a direct CompositeByteBuf with the given maximum number of components that can be stored in it.
  43.     CompositeByteBuf compositeDirectBuffer(int maxNumComponents);
  44.     //Returns true if direct ByteBuf's are pooled
  45.     boolean isDirectBufferPooled();
  46.     //Calculate the new capacity of a ByteBuf that is used when a ByteBuf needs to expand by the minNewCapacity with maxCapacity as upper-bound.
  47.     int calculateNewCapacity(int minNewCapacity, int maxCapacity);
  48. }
复制代码
(2)AbstractByteBufAllocator
AbstractByteBufAllocator实现了ByteBufAllocator的大部分功能,并最终暴露出两个基本的API,也就是抽象方法newDirectBuffer()和newHeapBuffer()。
 
这两个抽象方法会由PooledByteBufAllocator和UnpooledByteBufAllocator来实现。
  1. //Skeletal ByteBufAllocator implementation to extend.
  2. public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
  3.     private static final int DEFAULT_INITIAL_CAPACITY = 256;
  4.     private final boolean directByDefault;
  5.     private final ByteBuf emptyBuf;
  6.     //Instance use heap buffers by default
  7.     protected AbstractByteBufAllocator() {
  8.         this(false);
  9.     }
  10.     //Create new instance
  11.     //@param preferDirect true if #buffer(int) should try to allocate a direct buffer rather than a heap buffer
  12.     protected AbstractByteBufAllocator(boolean preferDirect) {
  13.         directByDefault = preferDirect && PlatformDependent.hasUnsafe();
  14.         emptyBuf = new EmptyByteBuf(this);
  15.     }
  16.     @Override
  17.     public ByteBuf buffer() {
  18.         if (directByDefault) {
  19.             return directBuffer();
  20.         }
  21.         return heapBuffer();
  22.     }
  23.    
  24.     @Override
  25.     public ByteBuf directBuffer() {
  26.         return directBuffer(DEFAULT_INITIAL_CAPACITY, Integer.MAX_VALUE);
  27.     }
  28.    
  29.     @Override
  30.     public ByteBuf heapBuffer() {
  31.         return heapBuffer(DEFAULT_INITIAL_CAPACITY, Integer.MAX_VALUE);
  32.     }
  33.    
  34.     @Override
  35.     public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
  36.         if (initialCapacity == 0 && maxCapacity == 0) {
  37.             return emptyBuf;
  38.         }
  39.         validate(initialCapacity, maxCapacity);
  40.         return newDirectBuffer(initialCapacity, maxCapacity);
  41.     }
  42.    
  43.     @Override
  44.     public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
  45.         if (initialCapacity == 0 && maxCapacity == 0) {
  46.             return emptyBuf;
  47.         }
  48.         validate(initialCapacity, maxCapacity);
  49.         return newHeapBuffer(initialCapacity, maxCapacity);
  50.     }
  51.    
  52.     private static void validate(int initialCapacity, int maxCapacity) {
  53.         if (initialCapacity < 0) {
  54.             throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expectd: 0+)");
  55.         }
  56.         if (initialCapacity > maxCapacity) {
  57.             throw new IllegalArgumentException(String.format("initialCapacity: %d (expected: not greater than maxCapacity(%d)", initialCapacity, maxCapacity));
  58.         }
  59.     }
  60.    
  61.     //Create a heap {@link ByteBuf} with the given initialCapacity and maxCapacity.
  62.     protected abstract ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity);
  63.     //Create a direct {@link ByteBuf} with the given initialCapacity and maxCapacity.
  64.     protected abstract ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity);
  65.     ...
  66. }
复制代码
 
7.ByteBufAllocator的两大子类
(1)UnpooledByteBufAllocator介绍
(2)PooledByteBufAllocator介绍
(3)PooledByteBufAllocator的结构
(4)PooledByteBufAllocator如何创建一个ByteBuf总结
 
(1)UnpooledByteBufAllocator介绍
对于UnpooledHeadByteBuf的创建,会直接new一个字节数组,并且读写指针初始化为0。对于UnpooledDirectByteBuf的创建,会直接new一个DirectByteBuffer对象。注意:其中unsafe是Netty自行判断的,如果系统支持获取unsafe对象就使用unsafe对象。
 
对比UnpooledUnsafeHeadByteBuf和UnpooledHeadByteBuf的getByte()方法可知,unsafe和非unsafe的区别如下:unsafe最终会通过对象的内存地址 + 偏移量的方式去拿到对应的数据,非unsafe最终会通过数组 + 下标或者JDK底层的ByteBuffer的API去拿到对应的数据。一般情况下,通过unsafe对象去取数据要比非unsafe要快一些,因为unsafe对象是直接通过内存地址操作的。
  1. //Simplistic ByteBufAllocator implementation that does not pool anything.
  2. public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator {
  3.     ...
  4.     @Override
  5.     protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
  6.         return PlatformDependent.hasUnsafe() ?
  7.             new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
  8.             new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
  9.     }
  10.     @Override
  11.     protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
  12.         ByteBuf buf = PlatformDependent.hasUnsafe() ?
  13.             UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
  14.             new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
  15.         return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
  16.     }
  17.     ...
  18. }
  19. //1.使用UnpooledUnsafeHeapByteBuf通过unsafe创建一个非池化的堆内存ByteBuf
  20. final class UnpooledUnsafeHeapByteBuf extends UnpooledHeapByteBuf {
  21.     ...
  22.     //Creates a new heap buffer with a newly allocated byte array.
  23.     //@param initialCapacity the initial capacity of the underlying byte array
  24.     //@param maxCapacity the max capacity of the underlying byte array
  25.     UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
  26.         super(alloc, initialCapacity, maxCapacity);
  27.     }
  28.    
  29.     @Override
  30.     public byte getByte(int index) {
  31.         checkIndex(index);
  32.         return _getByte(index);
  33.     }
  34.     @Override
  35.     protected byte _getByte(int index) {
  36.         return UnsafeByteBufUtil.getByte(array, index);
  37.     }
  38.     ...
  39. }
  40. //2.使用UnpooledHeapByteBuf通过非unsafe创建一个非池化的堆内存ByteBuf
  41. public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
  42.     private final ByteBufAllocator alloc;
  43.     byte[] array;
  44.     ...
  45.     //Creates a new heap buffer with a newly allocated byte array.
  46.     //@param initialCapacity the initial capacity of the underlying byte array
  47.     //@param maxCapacity the max capacity of the underlying byte array
  48.     protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
  49.         this(alloc, new byte[initialCapacity], 0, 0, maxCapacity);
  50.     }
  51.    
  52.     private UnpooledHeapByteBuf(ByteBufAllocator alloc, byte[] initialArray, int readerIndex, int writerIndex, int maxCapacity) {
  53.         ...
  54.         this.alloc = alloc;
  55.         setArray(initialArray);//设置直接创建的字节数组
  56.         setIndex(readerIndex, writerIndex);//设置读写指针为0
  57.     }
  58.     private void setArray(byte[] initialArray) {
  59.         array = initialArray;
  60.         ...
  61.     }
  62.    
  63.     @Override
  64.     public byte getByte(int index) {
  65.         ensureAccessible();
  66.         return _getByte(index);
  67.     }
  68.     @Override
  69.     protected byte _getByte(int index) {
  70.         return HeapByteBufUtil.getByte(array, index);
  71.     }
  72.     ...
  73. }
  74. //3.使用UnsafeByteBufUtil.newUnsafeDirectByteBuf()创建一个非池化的直接内存ByteBuf
  75. final class UnsafeByteBufUtil {
  76.     ...
  77.     static UnpooledUnsafeDirectByteBuf newUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
  78.         if (PlatformDependent.useDirectBufferNoCleaner()) {
  79.             return new UnpooledUnsafeNoCleanerDirectByteBuf(alloc, initialCapacity, maxCapacity);
  80.         }
  81.         return new UnpooledUnsafeDirectByteBuf(alloc, initialCapacity, maxCapacity);
  82.     }
  83. }
  84. public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf {
  85.     private final ByteBufAllocator alloc;
  86.     ByteBuffer buffer;
  87.     ...
  88.     //Creates a new direct buffer.
  89.     //@param initialCapacity the initial capacity of the underlying direct buffer
  90.     //@param maxCapacity     the maximum capacity of the underlying direct buffer
  91.     protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
  92.         ...
  93.         this.alloc = alloc;
  94.         setByteBuffer(allocateDirect(initialCapacity), false);
  95.     }
  96.    
  97.     //Allocate a new direct ByteBuffer with the given initialCapacity.
  98.     protected ByteBuffer allocateDirect(int initialCapacity) {
  99.         //使用ByteBuffer直接分配一个DirectByteBuffer对象
  100.         return ByteBuffer.allocateDirect(initialCapacity);
  101.     }
  102.    
  103.     final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
  104.         ...
  105.         this.buffer = buffer;
  106.         ...
  107.     }
  108. }
  109. //4.使用UnpooledDirectByteBuf创建一个非池化的直接内存ByteBuf
  110. public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf {
  111.     private final ByteBufAllocator alloc;
  112.     private ByteBuffer buffer;
  113.     ...
  114.     //Creates a new direct buffer.  
  115.     //@param initialCapacity the initial capacity of the underlying direct buffer
  116.     //@param maxCapacity     the maximum capacity of the underlying direct buffer
  117.     protected UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
  118.         ...
  119.         this.alloc = alloc;
  120.         //使用ByteBuffer直接分配一个DirectByteBuffer对象
  121.         setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));
  122.     }
  123.    
  124.     private void setByteBuffer(ByteBuffer buffer) {
  125.         ...
  126.         this.buffer = buffer;
  127.         ...
  128.     }
  129. }
  130. public abstract class ByteBuffer extends Buffer implements Comparable<ByteBuffer> {
  131.     ...
  132.     //Allocates a new direct byte buffer.
  133.     public static ByteBuffer allocateDirect(int capacity) {
  134.         return new DirectByteBuffer(capacity);
  135.     }
  136.     ...
  137. }
  138. //5.unsafe和非unsafe的区别
  139. final class UnsafeByteBufUtil {
  140.     //unsafe会调用这个方法
  141.     static byte getByte(byte[] array, int index) {
  142.         return PlatformDependent.getByte(array, index);
  143.     }
  144.     ...
  145. }
  146.    
  147. final class HeapByteBufUtil {
  148.     //非unsafe会调用这个方法
  149.     static byte getByte(byte[] memory, int index) {
  150.         return memory[index];
  151.     }
  152.     ...
  153. }
复制代码
(2)PooledByteBufAllocator介绍
PooledByteBufAllocator的newHeapBuffer()方法和newDirectBuffer()方法,都会首先通过threadCache获取一个PoolThreadCache对象,然后再从该对象获取一个heapArena对象或directArena对象,最后通过heapArena对象或directArena对象的allocate()方法去分配内存。
 
具体步骤如下:
步骤一:拿到线程局部缓存PoolThreadCache
因为newHeapBuffer()和newDirectBuffer()可能会被多线程同时调用,所以threadCache.get()拿到的是当前线程的cache,一个PoolThreadLocalCache对象。
 
PoolThreadLocalCache继承自FastThreadLocal,FastThreadLocal可以当作JDK的ThreadLocal,只不过比ThreadLocal更快。
 
每个线程都有唯一的PoolThreadCache,PoolThreadCache里维护两大内存:一个是堆内存heapArena,一个是堆外内存directArena。
 
步骤二:在线程局部缓存的Arena上进行内存分配
Arena可以翻译成竞技场的意思。
 
创建PooledByteBufAllocator内存分配器时,会创建两种类型的PoolArena数组:heapArenas和directArenas。这两个数组的大小默认都是两倍CPU核数,因为这样就和创建的NIO线程数一样了。这样每个线程都可以有一个独立的PoolArena。
 
PoolThreadLocalCache的initialValue()方法中,会从PoolArena数组中获取一个PoolArena与当前线程进行绑定。对于PoolArena数组里的每个PoolArena,在分配内存时是不用加锁的。
  1. public class PooledByteBufAllocator extends AbstractByteBufAllocator {
  2.     private final PoolThreadLocalCache threadCache;
  3.     private final PoolArena<byte[]>[] heapArenas;//一个线程会和一个PoolArena绑定
  4.     private final PoolArena<ByteBuffer>[] directArenas;//一个线程会和一个PoolArena绑定
  5.     //表示threadCache.tinySubPageHeapCaches数组里的每个MemoryRegionCache元素,最多可以缓存512个ByteBuf
  6.     private final int tinyCacheSize;
  7.     //表示threadCache.smallSubPageHeapCaches数组里的每个MemoryRegionCache元素,最多可以缓存256个ByteBuf
  8.     private final int smallCacheSize;
  9.     //表示threadCache.normalHeapCaches数组里的每个MemoryRegionCache元素,最多可以缓存64个ByteBuf
  10.     private final int normalCacheSize;
  11.     ...
  12.     static {
  13.         int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
  14.         DEFAULT_PAGE_SIZE = defaultPageSize;
  15.       
  16.         int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11);
  17.         DEFAULT_MAX_ORDER = defaultMaxOrder;
  18.       
  19.         final Runtime runtime = Runtime.getRuntime();
  20.         final int defaultMinNumArena = runtime.availableProcessors() * 2;
  21.         final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;//8K * 2^11 = 16M
  22.         DEFAULT_NUM_HEAP_ARENA = Math.max(0,SystemPropertyUtil.getInt("io.netty.allocator.numHeapArenas",
  23.             (int) Math.min(defaultMinNumArena, runtime.maxMemory() / defaultChunkSize / 2 / 3)));
  24.         DEFAULT_NUM_DIRECT_ARENA = Math.max(0,SystemPropertyUtil.getInt("io.netty.allocator.numDirectArenas",
  25.             (int) Math.min(defaultMinNumArena, PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));
  26.         //cache sizes
  27.         DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
  28.         DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
  29.         DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
  30.         //32 kb is the default maximum capacity of the cached buffer. Similar to what is explained in 'Scalable memory allocation using jemalloc'
  31.         DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt("io.netty.allocator.maxCachedBufferCapacity", 32 * 1024);
  32.         //the number of threshold of allocations when cached entries will be freed up if not frequently used
  33.         DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt("io.netty.allocator.cacheTrimInterval", 8192);
  34.         ...
  35.     }
  36.    
  37.     public PooledByteBufAllocator() {
  38.         this(false);
  39.     }
  40.       
  41.     public PooledByteBufAllocator(boolean preferDirect) {
  42.         this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
  43.     }
  44.       
  45.     public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
  46.         this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
  47.             DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE);
  48.     }
  49.       
  50.     //默认的pageSize=8K=8192,maxOrder=11,tinyCacheSize=512,smallCacheSize=256,normalCacheSize=64
  51.     public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena,
  52.         int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
  53.         super(preferDirect);
  54.         //初始化PoolThreadLocalCache
  55.         this.threadCache = new PoolThreadLocalCache();
  56.         this.tinyCacheSize = tinyCacheSize;//512
  57.         this.smallCacheSize = smallCacheSize;//256
  58.         this.normalCacheSize = normalCacheSize;//64
  59.         //chunkSize = 8K * 2^11 = 16M
  60.         final int chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
  61.         ...
  62.         //pageShifts = 13
  63.         int pageShifts = validateAndCalculatePageShifts(pageSize);
  64.         if (nHeapArena > 0) {
  65.             heapArenas = newArenaArray(nHeapArena);
  66.             List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
  67.             for (int i = 0; i < heapArenas.length; i ++) {
  68.                 PoolArena.HeapArena arena = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize);
  69.                 heapArenas[i] = arena;
  70.                 metrics.add(arena);
  71.             }
  72.             heapArenaMetrics = Collections.unmodifiableList(metrics);
  73.         } else {
  74.             heapArenas = null;
  75.             heapArenaMetrics = Collections.emptyList();
  76.         }
  77.         if (nDirectArena > 0) {
  78.             directArenas = newArenaArray(nDirectArena);
  79.             List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
  80.             for (int i = 0; i < directArenas.length; i ++) {
  81.                 PoolArena.DirectArena arena = new PoolArena.DirectArena(this, pageSize, maxOrder, pageShifts, chunkSize);
  82.                 directArenas[i] = arena;
  83.                 metrics.add(arena);
  84.             }
  85.             directArenaMetrics = Collections.unmodifiableList(metrics);
  86.         } else {
  87.             directArenas = null;
  88.             directArenaMetrics = Collections.emptyList();
  89.         }
  90.     }
  91.     @Override
  92.     protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
  93.         PoolThreadCache cache = threadCache.get();
  94.         PoolArena<byte[]> heapArena = cache.heapArena;
  95.         ByteBuf buf;
  96.         if (heapArena != null) {
  97.             //分配堆内存
  98.             buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
  99.         } else {
  100.             buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
  101.         }
  102.         return toLeakAwareBuffer(buf);
  103.     }
  104.     @Override
  105.     protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
  106.         PoolThreadCache cache = threadCache.get();
  107.         PoolArena<ByteBuffer> directArena = cache.directArena;
  108.         ByteBuf buf;
  109.         if (directArena != null) {
  110.             //分配直接内存
  111.             buf = directArena.allocate(cache, initialCapacity, maxCapacity);
  112.         } else {
  113.             if (PlatformDependent.hasUnsafe()) {
  114.                 buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
  115.             } else {
  116.                 buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
  117.             }
  118.         }
  119.         return toLeakAwareBuffer(buf);
  120.     }
  121.    
  122.     final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
  123.         @Override
  124.         protected synchronized PoolThreadCache initialValue() {
  125.             final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
  126.             final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
  127.             return new PoolThreadCache(
  128.                 heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
  129.                 DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
  130.         }
  131.         @Override
  132.         protected void onRemoval(PoolThreadCache threadCache) {
  133.             threadCache.free();
  134.         }
  135.    
  136.         private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
  137.             if (arenas == null || arenas.length == 0) {
  138.                 return null;
  139.             }
  140.             PoolArena<T> minArena = arenas[0];
  141.             for (int i = 1; i < arenas.length; i++) {
  142.                 PoolArena<T> arena = arenas[i];
  143.                 if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
  144.                     minArena = arena;
  145.                 }
  146.             }
  147.             return minArena;
  148.         }
  149.     }
  150.     ...
  151. }
  152. final class PoolThreadCache {
  153.     //PoolArena对象
  154.     final PoolArena<byte[]> heapArena;
  155.     final PoolArena<ByteBuffer> directArena;
  156.    
  157.     //ByteBuffer缓存队列
  158.     //Hold the caches for the different size classes, which are tiny, small and normal.
  159.     //有32个MemoryRegionCache元素,分别存放16B、32B、48B、...、480B、496B的SubPage级别的内存
  160.     private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
  161.     //有4个MemoryRegionCache元素,分别存放512B、1K、2K、4K的SubPage级别的内存
  162.     private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
  163.     //有3个MemoryRegionCache元素,分别存放8K、16K、32K的Page级别的内存
  164.     private final MemoryRegionCache<byte[]>[] normalHeapCaches;
  165.     private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
  166.     private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
  167.     private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
  168.     ...
  169. }
复制代码
(3)PooledByteBufAllocator的结构
3.jpg
(4)PooledByteBufAllocator如何创建一个ByteBuf总结
每个线程调用PoolThreadLocalCache的get()方法时,都会拿到一个PoolThreadCache对象。然后通过PoolThreadCache对象可以拿到该线程对应的一个PoolArena对象。
 
这个PoolThreadCache对象的作用就是通过FastThreadLocal的方式,把PooledByteBufAllocator内存分配器的PoolArena数组中的一个PoolArena对象放入它的成员变量里。
 
比如第1个线程就会拿到内存分配器的heapArenas数组中的第1个PoolArena对象,第n个线程就会拿到内存分配器的heapArenas数组中的第n个PoolArena对象,从而将一个线程和一个PoolArena进行绑定了。
 
PoolThreadCache除了可以直接在这个PoolArena内存上进行分配外,还可以在它维护的一些ByteBuffer或者byte[]缓存列表上进行分配。比如我们通过PooledByteBufAllocator内存分配器创建了一个1024字节的ByteBuf,该ByteBuf被用完并释放后,可能还需要在其他地方继续分配1024字节大小的内存。这时其实不需要重新在PoolArena上进行内存分配,而可以直接从PoolThreadCache里维护的ByteBuffer或byte[]缓存列表里拿出来返回即可。
 
PooledByteBufAllocator内存分配器里维护了三个值:tinyCacheSize、smallCacheSize、normalCacheSize,tinyCacheSize表明tiny类型的ByteBuf最多可以缓存512个,smallCacheSize表明small类型的ByteBuf最多可以缓存256个,normalCacheSize表明normal类型的ByteBuf最多可以缓存64个。
 
在创建PoolThreadCache对象时,会把这3个值传递进去。然后用于创建:
tinySubPageHeapCaches、
smallSubPageHeapCaches、
normalHeapCaches、
tinySubPageDirectCaches、
smallSubPageDirectCaches、
normalDirectCaches。
 
8.PoolArena分配内存的流程
PooledByteBufAllocator内存分配器在使用其方法newHeapBuffer()和newDirectBuffer()分配内存时,会分别执行代码heapArena.allocate()和directArena.allocate(),其实就是调用PoolArena的allocate()方法。在PoolArena的allocate()方法里,会通过其抽象方法newByteBuf()创建一个PooledByteBuf对象,而具体的newByteBuf()方法会由PoolArena的子类DirectArena和HeapArena来实现。
 
PoolArena的allocate()方法分配内存的大体逻辑如下:首先通过由PoolArena子类实现的newByteBuf()方法获取一个PooledByteBuf对象,接着通过PoolArena的allocate()方法在线程私有的PoolThreadCache上分配内存,这个分配过程其实就是拿一块内存,然后初始化PooledByteBuf对象里的内存地址值。
[code]public class PooledByteBufAllocator extends AbstractByteBufAllocator {    private final PoolThreadLocalCache threadCache;    private final PoolArena[] heapArenas;//一个线程会和一个PoolArena绑定    private final PoolArena[] directArenas;//一个线程会和一个PoolArena绑定    ...    @Override    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {        PoolThreadCache cache = threadCache.get();        PoolArena heapArena = cache.heapArena;        ByteBuf buf;        if (heapArena != null) {            //分配堆内存            buf = heapArena.allocate(cache, initialCapacity, maxCapacity);        } else {            buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);        }        return toLeakAwareBuffer(buf);    }    @Override    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {        PoolThreadCache cache = threadCache.get();        PoolArena directArena = cache.directArena;        ByteBuf buf;        if (directArena != null) {            //分配直接内存            buf = directArena.allocate(cache, initialCapacity, maxCapacity);        } else {            if (PlatformDependent.hasUnsafe()) {                buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);            } else {                buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);            }        }        return toLeakAwareBuffer(buf);    }    ...}abstract class PoolArena implements PoolArenaMetric {    ...    PooledByteBuf allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {        PooledByteBuf buf = newByteBuf(maxCapacity);//创建ByteBuf对象        allocate(cache, buf, reqCapacity);//基于PoolThreadCache对ByteBuf对象进行内存分配        return buf;    }        private void allocate(PoolThreadCache cache, PooledByteBuf buf, final int reqCapacity) {        final int normCapacity = normalizeCapacity(reqCapacity);        if (isTinyOrSmall(normCapacity)) {//capacity < pageSize,需要分配的内存小于8K            int tableIdx;            PoolSubpage[] table;            boolean tiny = isTiny(normCapacity);            if (tiny) {//< 512                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {                    //命中缓存,was able to allocate out of the cache so move on                    return;                }                tableIdx = tinyIdx(normCapacity);                table = tinySubpagePools;            } else {                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {                    //命中缓存,was able to allocate out of the cache so move on                    return;                }                tableIdx = smallIdx(normCapacity);                table = smallSubpagePools;            }            final PoolSubpage head = table[tableIdx];            //Synchronize on the head.             //This is needed as PoolChunk#allocateSubpage(int) and PoolChunk#free(long) may modify the doubly linked list as well.            synchronized (head) {                final PoolSubpage s = head.next;                if (s != head) {                    assert s.doNotDestroy && s.elemSize == normCapacity;                    long handle = s.allocate();                    assert handle >= 0;                    s.chunk.initBufWithSubpage(buf, handle, reqCapacity);                    if (tiny) {                        allocationsTiny.increment();                    } else {                        allocationsSmall.increment();                    }                    return;                }            }            //没有命中缓存            allocateNormal(buf, reqCapacity, normCapacity);            return;        }        if (normCapacity
您需要登录后才可以回帖 登录 | 立即注册