大纲
9.Netty的内存规格
10.缓存数据结构
11.命中缓存的分配流程
12.Netty里有关内存分配的重要概念
13.Page级别的内存分配
14.SubPage级别的内存分配
15.ByteBuf的回收
9.Netty的内存规格
(1)4种内存规格
(2)内存申请单位
(1)4种内存规格
一.tiny:表示从0到512字节之间的内存大小
二.small:表示从512字节到8K范围的内存大小
三.normal:表示从8K到16M范围的内存大小
四.huge:表示大于16M的内存大小
(2)内存申请单位
Netty里所有的内存申请都是以Chunk为单位向操作系统申请的,后续所有的内存分配都是在这个Chunk里进行对应的操作。比如要分配1M的内存,那么首先要申请一个16M的Chunk,然后在这个16M的Chunk里取出一段1M的连续内存放入到Netty的ByteBuf里。
注意:一个Chunk的大小为16M,一个Page的大小为8K,一个SubPage的大小是0~8K,一个Chunk可以分成2048个Page。
10.缓存数据结构
(1)MemoryRegionCache的组成
(2)MemoryRegionCache的类型
(3)MemoryRegionCache的源码
(1)MemoryRegionCache的组成
Netty中与缓存相关的数据结构叫MemoryRegionCache,这是内存相关的一个缓存。MemoryRegionCache由三部分组成:queue、sizeClass、size。
一.queue
queue是一个队列,里面的每个元素都是MemoryRegionCache内部类Entry的一个实体,每一个Entry实体里都有一个chunk和一个handle。Netty里所有的内存都是以Chunk为单位进行分配的,而每一个handle都指向唯一一段连续的内存。所以一个chunk + 一个指向连续内存的handle,就能确定这块Entry的内存大小和内存位置,然后所有这些Entry组合起来就变成一个缓存的链。
二.sizeClass
sizeClass是Netty里的内存规格,其中有三种类型的内存规则。一种是tiny(0~512B),一种是small(512B~8K),一种是normal(8K~16M)。由于huge是直接使用非缓存的内存分配,所以不在该sizeClass范围内。
三.size
一个MemoryRegionCache所缓存的一个ByteBuf的大小是固定的。如果MemoryRegionCache里缓存了1K的ByteBuf,那么queue里所有的元素都是1K的ByteBuf。也就是说,同一个MemoryRegionCache它的queue里的所有元素都是固定大小的。这些固定大小分别有:tiny类型规则的是16B的整数倍直到498B,small类型规则的有512B、1K、2K、4K,normal类型规定的有8K、16K、32K。所以对于32K以上是不缓存的。
(2)MemoryRegionCache的类型
Netty里所有规格的MemoryRegionCache如下图示,下面的每个节点就相当于一个MemoryRegionCache的数据结构。
其中tiny类型的内存规格有32种,也就是32个节点,分别是16B、32B、48B、......、496B。这里面的每个节点都是一个MemoryRegionCache,每个MemoryRegionCache里都有一个queue。假设要分配一个16B的ByteBuf:首先会定位到small类型的内存规格里的第二个节点,然后从该节点维护的queue队列里取出一个Entry元素。通过该Entry元素可以拿到它属于哪一个chunk以及哪一个handle,从而进行内存划分。
small类型的内存规格有4种,也就是4个节点,分别是512B、1K、2K、4K。每个节点都是一个MemoryRegionCache,每个MemoryRegionCache里都有一个queue。假设要分配一个1K的ByteBuf:首先会定位到small类型的内存规格里的第二个节点,然后从该节点维护的queue里取出一个Entry元素。这样就可以基于这个Entry元素分配出1K内存的ByteBuf,不需要再去Chunk上找一段临时内存了。
normal类型的内存规格有3种,也就是3个节点,分别是8K、16K、32K,关于Normal大小的ByteBuf的内存分配也是同样道理。
(3)MemoryRegionCache的源码
每个线程都会有一个PoolThreadCache对象,每个PoolThreadCache对象都会有tiny、small、normal三种规格的缓存。每种规格又分heap和direct,所以每个PoolThreadCache对象会有6种缓存。PoolThreadCache类正是使用了6个MemoryRegionCache数组来维护这6种缓存。如:
数组tinySubPageHeapCaches拥有32个MemoryRegionCache元素,下标为n的元素用于缓存大小为n * 16B的ByteBuf。
数组smallSubPageHeapCaches拥有4个MemoryRegionCache元素,下标为n的元素用于缓存大小为2^n * 512B的ByteBuf。
数组normalHeapCaches拥有3个MemoryRegionCache元素,下标为n的元素用于缓存大小为2^n * 8K的ByteBuf。
数组tinySubPageHeapCaches里的每个MemoryRegionCache元素,最多可以缓存tinyCacheSize个即512个ByteBuf。
数组smallSubPageHeapCaches里的每个MemoryRegionCache元素,最多可以缓存smallCacheSize个即256个ByteBuf。
数组normalHeapCaches里的每个MemoryRegionCache元素,最多可以缓存normalCacheSize个即64个ByteBuf。- final class PoolThreadCache {
- //真正要分配的内存其实就是byte[] 或者 ByteBuffer,所以实际的分配就是得到一个数值handle进行定位
- final PoolArena<byte[]> heapArena;
- final PoolArena<ByteBuffer> directArena;
- //Hold the caches for the different size classes, which are tiny, small and normal.
- //有32个MemoryRegionCache元素,分别存放16B、32B、48B、...、480B、496B的SubPage级别的内存
- private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
- //有4个MemoryRegionCache元素,分别存放512B、1K、2K、4K的SubPage级别的内存
- private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
- //有3个MemoryRegionCache元素,分别存放8K、16K、32K的Page级别的内存
- private final MemoryRegionCache<byte[]>[] normalHeapCaches;
- private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
- private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
- private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
-
- PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
- int tinyCacheSize, int smallCacheSize, int normalCacheSize,
- int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
- ...
- this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
- this.heapArena = heapArena;
- this.directArena = directArena;
- if (directArena != null) {
- tinySubPageDirectCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
- smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
- numShiftsNormalDirect = log2(directArena.pageSize);
- normalDirectCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, directArena);
- directArena.numThreadCaches.getAndIncrement();
- } else {
- //No directArea is configured so just null out all caches
- tinySubPageDirectCaches = null;
- smallSubPageDirectCaches = null;
- normalDirectCaches = null;
- numShiftsNormalDirect = -1;
- }
- if (heapArena != null) {
- //Create the caches for the heap allocations
- tinySubPageHeapCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
- smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
- numShiftsNormalHeap = log2(heapArena.pageSize);
- normalHeapCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, heapArena);
- heapArena.numThreadCaches.getAndIncrement();
- } else {
- //No heapArea is configured so just null out all caches
- tinySubPageHeapCaches = null;
- smallSubPageHeapCaches = null;
- normalHeapCaches = null;
- numShiftsNormalHeap = -1;
- }
- //The thread-local cache will keep a list of pooled buffers which must be returned to the pool when the thread is not alive anymore.
- ThreadDeathWatcher.watch(thread, freeTask);
- }
-
- private static <T> MemoryRegionCache<T>[] createSubPageCaches(int cacheSize, int numCaches, SizeClass sizeClass) {
- if (cacheSize > 0) {
- @SuppressWarnings("unchecked")
- MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
- for (int i = 0; i < cache.length; i++) {
- cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
- }
- return cache;
- } else {
- return null;
- }
- }
- private static <T> MemoryRegionCache<T>[] createNormalCaches(int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
- if (cacheSize > 0) {
- int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
- int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
- @SuppressWarnings("unchecked")
- MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
- for (int i = 0; i < cache.length; i++) {
- cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
- }
- return cache;
- } else {
- return null;
- }
- }
-
- private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
- SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
- super(size, sizeClass);
- }
- ...
- }
- private static int log2(int val) {
- int res = 0;
- while (val > 1) {
- val >>= 1;
- res++;
- }
- return res;
- }
-
- ...
-
- private abstract static class MemoryRegionCache<T> {
- private final int size;
- private final Queue<Entry<T>> queue;
- private final SizeClass sizeClass;
- MemoryRegionCache(int size, SizeClass sizeClass) {
- this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
- queue = PlatformDependent.newFixedMpscQueue(this.size);
- this.sizeClass = sizeClass;
- }
- ...
-
- static final class Entry<T> {
- final Handle<Entry<?>> recyclerHandle;
- PoolChunk<T> chunk;
- long handle = -1;
- Entry(Handle<Entry<?>> recyclerHandle) {
- this.recyclerHandle = recyclerHandle;
- }
- void recycle() {
- chunk = null;
- handle = -1;
- recyclerHandle.recycle(this);
- }
- }
- }
- }
- abstract class PoolArena<T> implements PoolArenaMetric {
- enum SizeClass {
- Tiny,
- Small,
- Normal
- }
- ...
- }
复制代码 (3)标记连续内存的区段为未使用
标记方式会根据Page级别和SubPage级别进行标记,其中Page级别是根据二叉树来进行标记,SubPage级别是通过位图进行标记。- public class PooledByteBufAllocator extends AbstractByteBufAllocator {
- private final PoolThreadLocalCache threadCache;
- private final PoolArena<byte[]>[] heapArenas;//一个线程会和一个PoolArena绑定
- private final PoolArena<ByteBuffer>[] directArenas;//一个线程会和一个PoolArena绑定
- ...
- @Override
- protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
- PoolThreadCache cache = threadCache.get();
- PoolArena<byte[]> heapArena = cache.heapArena;
- ByteBuf buf;
- if (heapArena != null) {
- //分配堆内存
- buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
- } else {
- buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
- }
- return toLeakAwareBuffer(buf);
- }
- @Override
- protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
- PoolThreadCache cache = threadCache.get();
- PoolArena<ByteBuffer> directArena = cache.directArena;
- ByteBuf buf;
- if (directArena != null) {
- //分配直接内存
- buf = directArena.allocate(cache, initialCapacity, maxCapacity);
- } else {
- if (PlatformDependent.hasUnsafe()) {
- buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
- } else {
- buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
- }
- }
- return toLeakAwareBuffer(buf);
- }
- ...
- }
- abstract class PoolArena<T> implements PoolArenaMetric {
- ...
- PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
- PooledByteBuf<T> buf = newByteBuf(maxCapacity);//创建ByteBuf对象
- allocate(cache, buf, reqCapacity);//基于PoolThreadCache对ByteBuf对象进行内存分配
- return buf;
- }
-
- private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
- //1.根据reqCapacity进行分段规格化
- final int normCapacity = normalizeCapacity(reqCapacity);
- if (isTinyOrSmall(normCapacity)) {//capacity < pageSize,需要分配的内存小于8K
- int tableIdx;
- PoolSubpage<T>[] table;
- boolean tiny = isTiny(normCapacity);
- if (tiny) {//< 512
- //2.进行缓存分配
- if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
- //命中缓存,was able to allocate out of the cache so move on
- return;
- }
- tableIdx = tinyIdx(normCapacity);
- table = tinySubpagePools;
- } else {
- //2.进行缓存分配
- if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
- //命中缓存,was able to allocate out of the cache so move on
- return;
- }
- tableIdx = smallIdx(normCapacity);
- table = smallSubpagePools;
- }
- final PoolSubpage<T> head = table[tableIdx];
- //Synchronize on the head.
- //This is needed as PoolChunk#allocateSubpage(int) and PoolChunk#free(long) may modify the doubly linked list as well.
- synchronized (head) {
- final PoolSubpage<T> s = head.next;
- if (s != head) {
- assert s.doNotDestroy && s.elemSize == normCapacity;
- long handle = s.allocate();
- assert handle >= 0;
- s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
- if (tiny) {
- allocationsTiny.increment();
- } else {
- allocationsSmall.increment();
- }
- return;
- }
- }
- //没有命中缓存
- allocateNormal(buf, reqCapacity, normCapacity);
- return;
- }
- if (normCapacity <= chunkSize) {//需要分配的内存大于8K,但小于16M
- //2.进行缓存分配
- if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
- //命中缓存,was able to allocate out of the cache so move on
- return;
- }
- //没有命中缓存
- allocateNormal(buf, reqCapacity, normCapacity);
- } else {//需要分配的内存大于16M
- //Huge allocations are never served via the cache so just call allocateHuge
- allocateHuge(buf, reqCapacity);
- }
- }
-
- //根据reqCapacity进行分段规格化
- int normalizeCapacity(int reqCapacity) {
- if (reqCapacity < 0) {
- throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
- }
- if (reqCapacity >= chunkSize) {
- return reqCapacity;
- }
- if (!isTiny(reqCapacity)) { // >= 512
- int normalizedCapacity = reqCapacity;
- normalizedCapacity --;
- normalizedCapacity |= normalizedCapacity >>> 1;
- normalizedCapacity |= normalizedCapacity >>> 2;
- normalizedCapacity |= normalizedCapacity >>> 4;
- normalizedCapacity |= normalizedCapacity >>> 8;
- normalizedCapacity |= normalizedCapacity >>> 16;
- normalizedCapacity ++;
- if (normalizedCapacity < 0) {
- normalizedCapacity >>>= 1;
- }
- return normalizedCapacity;
- }
- if ((reqCapacity & 15) == 0) {
- return reqCapacity;
- }
- return (reqCapacity & ~15) + 16;
- }
- ...
- }
- final class PoolThreadCache {
- final PoolArena<byte[]> heapArena;
- final PoolArena<ByteBuffer> directArena;
- //Hold the caches for the different size classes, which are tiny, small and normal.
- //有32个MemoryRegionCache元素,分别存放16B、32B、48B、...、480B、496B的SubPage级别的内存
- private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
- //有4个MemoryRegionCache元素,分别存放512B、1K、2K、4K的SubPage级别的内存
- private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
- //有3个MemoryRegionCache元素,分别存放8K、16K、32K的Page级别的内存
- private final MemoryRegionCache<byte[]>[] normalHeapCaches;
- private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
- private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
- private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
- ...
-
- //Try to allocate a tiny buffer out of the cache. Returns true if successful false otherwise
- boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
- //首先调用cacheForTiny()方法找到需要分配的size对应的MemoryRegionCache
- //然后调用allocate()方法基于MemoryRegionCache去给ByteBuf对象分配内存
- return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
- }
-
- //找到需要分配的size对应的MemoryRegionCache
- private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
- int idx = PoolArena.tinyIdx(normCapacity);
- if (area.isDirect()) {
- return cache(tinySubPageDirectCaches, idx);
- }
- return cache(tinySubPageHeapCaches, idx);
- }
-
- //根据索引去缓存数组中返回一个MemoryRegionCache元素
- private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
- if (cache == null || idx > cache.length - 1) {
- return null;
- }
- return cache[idx];
- }
-
- //基于MemoryRegionCache去给ByteBuf对象分配内存
- private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
- if (cache == null) {
- return false;
- }
- //调用MemoryRegionCache的allocate()方法给buf分配大小为reqCapacity的一块内存
- boolean allocated = cache.allocate(buf, reqCapacity);
- if (++ allocations >= freeSweepAllocationThreshold) {
- allocations = 0;
- trim();
- }
- return allocated;
- }
- ...
- private abstract static class MemoryRegionCache<T> {
- private final int size;
- private final Queue<Entry<T>> queue;
- private final SizeClass sizeClass;
- private int allocations;
- ...
- //Allocate something out of the cache if possible and remove the entry from the cache.
- public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
- //步骤一:从queue队列中弹出一个Entry元素
- Entry<T> entry = queue.poll();
- if (entry == null) {
- return false;
- }
- //步骤二:初始化buf
- initBuf(entry.chunk, entry.handle, buf, reqCapacity);
- //步骤三:将弹出的Entry元素放入对象池中进行复用
- entry.recycle();
- //allocations is not thread-safe which is fine as this is only called from the same thread all time.
- ++ allocations;
- return true;
- }
-
- //Init the PooledByteBuf using the provided chunk and handle with the capacity restrictions.
- protected abstract void initBuf(PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity);
-
- static final class Entry<T> {
- final Handle<Entry<?>> recyclerHandle;
- PoolChunk<T> chunk;
- long handle = -1;
- Entry(Handle<Entry<?>> recyclerHandle) {
- this.recyclerHandle = recyclerHandle;
- }
- void recycle() {
- chunk = null;
- handle = -1;
- recyclerHandle.recycle(this);
- }
- }
- }
- }
复制代码 (4)将ByteBuf对象添加到对象池
一开始时,对象池是没有PooledByteBuf对象的,当PooledByteBuf对象被释放时不会被立即销毁,而是会加入到对象池里。
这样当Netty每次去拿一个PooledByteBuf对象时,就可以先从对象池里获取,取出对象之后就可以进行内存分配以及初始化了。
考虑到PooledByteBuf对象会经常被申请和释放,如果QPS非常高,可能会产生很多PooledByteBuf对象,而且频繁创建和释放PooledByteBuf对象也会比较耗费资源和降低性能。
所以Netty便使用了对象池来减少GC:当申请PooledByteBuf对象时,就可以尽可能从对象池里去取。当释放PooledByteBuf对象时,则可以将对象添加到对象池,从而实现对象复用。- final class PoolThreadCache {
- final PoolArena<byte[]> heapArena;
- final PoolArena<ByteBuffer> directArena;
- //Hold the caches for the different size classes, which are tiny, small and normal.
- //有32个MemoryRegionCache元素,分别存放16B、32B、48B、...、480B、496B的SubPage级别的内存
- private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
- //有4个MemoryRegionCache元素,分别存放512B、1K、2K、4K的SubPage级别的内存
- private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
- //有3个MemoryRegionCache元素,分别存放8K、16K、32K的Page级别的内存
- private final MemoryRegionCache<byte[]>[] normalHeapCaches;
- private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
- private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
- private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
- ...
- }
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |