huxipeng

     这一节我们一起看下分配过程

1     PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
2         PooledByteBuf<T> buf = newByteBuf(maxCapacity); // 初始化一块容量为 2^31 - 1的ByteBuf
3         allocate(cache, buf, reqCapacity); // reqCapacity = 1024  进入分配逻辑
4         return buf;
5     }
 1     private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
 2         final int normCapacity = normalizeCapacity(reqCapacity); // 对请求的大小进行校准,一会具体看
 3         if (isTinyOrSmall(normCapacity)) { // capacity < pageSize 判断校准之后的请求大小 是否小于 8k
 4             int tableIdx;
 5             PoolSubpage<T>[] table;
 6             boolean tiny = isTiny(normCapacity); // 判断请求大小是否小于512B
 7             if (tiny) { // < 512
 8                 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { // 从缓存中进行分配,如果是第一次这里肯定是没有对应大小缓存的,除非之前有过释放,然后内存空间被缓存起来了。
 9                     // was able to allocate out of the cache so move on
10                     return;
11                 }
12                 tableIdx = tinyIdx(normCapacity); // tiny数组中获取对应的下标 
13                 table = tinySubpagePools; // tiny subPage数组
14             } else { // 512 <=  normCapacity  < 8K
15                 if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { // 一样从缓存中进行分配,这个上一节详细的介绍过
16                     // was able to allocate out of the cache so move on
17                     return;
18                 }
19                 tableIdx = smallIdx(normCapacity); // 获取small subPage数组下标  
20                 table = smallSubpagePools; // subPage 数组
21             }
22 
23             final PoolSubpage<T> head = table[tableIdx];// 取得对应下标的subPage
24 
25             /**
26              * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
27              * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
28              */
29             synchronized (head) { // 锁定防止其他操作修改head结点
30                 final PoolSubpage<T> s = head.next;
31                 if (s != head) {// 如果双向链表已经初始化,那么从subpage中进行分配
32                     assert s.doNotDestroy && s.elemSize == normCapacity; // 断言确保 当前的这个subPage中的elemSize大小必须和校对后的请求大小一样
33                     long handle = s.allocate(); // 从subPage中进行分配
34                     assert handle >= 0;
35                     s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
36                     incTinySmallAllocation(tiny);
37                     return;
38                 }
39             }
40             synchronized (this) {
41                 allocateNormal(buf, reqCapacity, normCapacity); // 双向循环链表还没初始化,使用normal分配
42             }
43 
44             incTinySmallAllocation(tiny);
45             return;
46         }
47         if (normCapacity <= chunkSize) {
48             if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
49                 // was able to allocate out of the cache so move on
50                 return;
51             }
52             synchronized (this) {
53                 allocateNormal(buf, reqCapacity, normCapacity);
54                 ++allocationsNormal;
55             }
56         } else {
57             // Huge allocations are never served via the cache so just call allocateHuge
58             allocateHuge(buf, reqCapacity);
59         }
60     }
    int normalizeCapacity(int reqCapacity) {
        if (reqCapacity < 0) {
            throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
        }

        if (reqCapacity >= chunkSize) {
            return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);
        }

        if (!isTiny(reqCapacity)) { //  如果是 >=512 的内存申请,那么就规范到2的次方大小 ,比如 1000 就会 变成 1024,自己可以把这段位操作,自己main方法试一下。
            // Doubled

            int normalizedCapacity = reqCapacity;
            normalizedCapacity --;
            normalizedCapacity |= normalizedCapacity >>>  1;
            normalizedCapacity |= normalizedCapacity >>>  2;
            normalizedCapacity |= normalizedCapacity >>>  4;
            normalizedCapacity |= normalizedCapacity >>>  8;
            normalizedCapacity |= normalizedCapacity >>> 16;
            normalizedCapacity ++;

            if (normalizedCapacity < 0) {
                normalizedCapacity >>>= 1;
            }
            assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;

            return normalizedCapacity;
        }

        if (directMemoryCacheAlignment > 0) {
            return alignCapacity(reqCapacity);
        }

        // Quantum-spaced
        if ((reqCapacity & 15) == 0) { // 如果 < 512 判断如果是16的倍数,那么直接返回
            return reqCapacity;
        }

        return (reqCapacity & ~15) + 16; // 如果不是则对齐成16的倍数
    }
 private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
     if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
         q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
         q075.allocate(buf, reqCapacity, normCapacity)) {
         return;
     }  // 先尝试从chunk链表中进行分配, 这里有一个需要注意的地方
     // 分配顺序 先分配 q50 也就是 50% ~ 100% 的,然后是 q25 也就是 25% ~ 75% 的, 然后是 q000 0%~50% 、 qInit 0%~25% 、 q75 75% ~ 100%

     // Add a new chunk.
     PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); // 新建一个Chunk 
     long handle = c.allocate(normCapacity); // 内存分配
     assert handle > 0;
     c.initBuf(buf, handle, reqCapacity);
     qInit.add(c); // 添加到初始化的chunk链表
 }

上面的分配顺序,大家想一下为什么不是从q000开始分配呢?我找了一段分析的很好的。

在分析PoolChunkList的时候,我们知道一个chunk随着内存的不停释放,它本身会不停的往其所在的chunk list的prev list移动,直到其完全释放后被回收。 如果这里是从q000开始尝试分配,虽然分配的速度可能更快了(因为分配成功的几率更大),但一个chunk在使用率为25%以内时有更大几率再分配,也就是一个chunk被回收的几率大大降低了。这样就带来了一个问题,我们的应用在实际运行过程中会存在一个访问高峰期,这个时候内存的占用量会是平时的几倍,因此会多分配几倍的chunk出来,而等高峰期过去以后,由于chunk被回收的几率降低,内存回收的进度就会很慢(因为没被完全释放,所以无法回收),内存就存在很大的浪费。

为什么是从q050开始尝试分配呢,q050是内存占用50%~100%的chunk,猜测是希望能够提高整个应用的内存使用率,因为这样大部分情况下会使用q050的内存,这样在内存使用不是很多的情况下一些利用率低(<50%)的chunk慢慢就会淘汰出去,最终被回收。然而为什么不是从qinit中开始呢,这里的chunk利用率低,但又不会被回收,岂不是浪费?q075,q100由于使用率高,分配成功的几率也会更小,因此放到最后。

上面也有提到新建一个chunk,这里我想特殊说明两个byte数组,数组的大小是4096

        memoryMap = new byte[maxSubpageAllocs << 1];
        depthMap = new byte[memoryMap.length];
        int memoryMapIndex = 1;
        for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time
            int depth = 1 << d;
            for (int p = 0; p < depth; ++ p) {
                // in each level traverse left to right and set value to the depth of subtree
                memoryMap[memoryMapIndex] = (byte) d;
                depthMap[memoryMapIndex] = (byte) d;
                memoryMapIndex ++;
            }
        }

循环之后的数组是怎么样子的呢?

MemoryMap存放分配信息,depthMap存放节点的高度信息 ;初始状态时,memoryMap和depthMap相等,depthMap的值初始化后不再改变,memoryMap的值则随着节点分配而改变。

下标代表了平衡二叉树的节点序号,值代表该节点的高度值。

 比如MemoryMap[1024] = 10, MemoryMap[2048] = 11. 好了,这个说完了,我们继续看:

1     long allocate(int normCapacity) {
2         if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize  >= 8K
3             return allocateRun(normCapacity);
4         } else {
5             return allocateSubpage(normCapacity); // < 8K
6         }
7     }
 1     private long allocateSubpage(int normCapacity) {
 2         // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
 3         // This is need as we may add it back and so alter the linked-list structure.
 4         PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity); // 根据请求内存大小,获取下标,然后获取对应的subPage
 5         synchronized (head) {
 6             int d = maxOrder; // 最大层数 11
 7             int id = allocateNode(d);  // 分配节点,修改涉及节点层数, 具体看下面的分析
 8             if (id < 0) { // 无可分配节点
 9                 return id;
10             }
11 
12             final PoolSubpage<T>[] subpages = this.subpages;
13             final int pageSize = this.pageSize; 
14 
15             freeBytes -= pageSize;// 16M(初始化值,后面会逐渐减少) - 8K
16 
17             int subpageIdx = subpageIdx(id); // 2048 对应的偏移量是 0 , 2049 对应的偏移量是 1 。。。4095 对应偏移量是 2047 
18             PoolSubpage<T> subpage = subpages[subpageIdx]; // 取对应下标的 subpage
19             if (subpage == null) {
20                 subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity); // 创建PoolSubpage
21                 subpages[subpageIdx] = subpage;
// 新建或初始化subpage并加入到chunk的subpages数组,同时将subpage加入到arena的subpage双向链表中,最后完成分配请求的内存。
// 代码中,subpage != null的情况产生的原因是:subpage初始化后分配了内存,但一段时间后该subpage分配的内存释放并从arena的双向链表中删除,
// 此时subpage不为null,当再次请求分配时,只需要调用init()将其加入到areana的双向链表中即可。
22 } else { 23 subpage.init(head, normCapacity); 24 } 25 return subpage.allocate(); // 最后结果是一个long整数,其中低32位表示二叉树中的分配的节点,高32位表示subPage中分配的具体位置 26 } 27 }

 修改节点对应的层数,底下这个方法涉及了很多的位运算,这里大家要仔细琢磨。

 1     private int allocateNode(int d) { // d = 11
 2         int id = 1;
 3         int initial = - (1 << d); // -2048
 4         byte val = value(id); // memoryMap[id] , 一般1对应的层数是 0 ,当第一个节点被分配完成后,这个节点的值会变成12
 5         if (val > d) { // unusable // 如果分配完成,那么无法进行分配,那么就是 12 > 11 ,直接返回 -1 
 6             return -1;
 7         }
 8         while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
 9             id <<= 1; // 左移1位
10             val = value(id); // 取到节点对应的层数
11             if (val > d) { // 如果当前的值大于层数,也就是说左子节点用完了
12                 id ^= 1; // 当前节点值 +1 ,取右节点
13                 val = value(id); // 取右子节点
14             }
15         }
16         byte value = value(id); 
17         assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
18                 value, id & initial, d);
19         setValue(id, unusable); // mark as unusable //将该节点的值设置为12,不可用
20         updateParentsAlloc(id); // 将该节点对应的所有父节点,层数全部 + 1
21         return id; 
22     }
 1     private void updateParentsAlloc(int id) {
 2         while (id > 1) {
 3             int parentId = id >>> 1; // 取父级节点
 4             byte val1 = value(id); // 
 5             byte val2 = value(id ^ 1); // 如果 id = 2048 那么 id ^ 1 = 2049 ,  如果 id = 2049 这里  id ^ 1 = 2048 这里一定要注意!!!
 6             byte val = val1 < val2 ? val1 : val2; 
 7             setValue(parentId, val);  // 修改父级对应层数
 8             id = parentId; 
 9         }
10     }

上面这个方法很有意思,大家要仔细体会,我举个例子

 最开始是这样的,我们开始第一次分配,那么就是2048号节点,分配完成后会变成

 

 这时候由于2048变成了12,不可用状态,那么取右节点进行分配

    下一次再进行分配的时候, 发现 1024 是 12 ,那么取右子节点, 是 1025 = 10 ,因为 10 < 11 所以再次进入循环,找到2050节点,对应层数是 11 < 12 ,则分配 2050节点分配后变成

  

 然后依次类推。

1     private long allocateRun(int normCapacity) {
2         int d = maxOrder - (log2(normCapacity) - pageShifts); // 计算满足需求的节点的高度 比如 16K , 那么 就是 d= 11 - (14 - 13) = 10
3         int id = allocateNode(d); // 从第十层找空闲节点, 原理跟上面相同,就不说了
4         if (id < 0) {
5             return id;
6         }
7         freeBytes -= runLength(id); // 分配后剩余的字节数
8         return id;
9     }

说到这里,内存模型的重点部分算是说完了。如果整个Netty系列有任何不对的地方,欢迎大家指出,我会虚心改正。

      后面呢,我将打算将Spring的源码的关键部分进行分析,期间也会穿插着对Redis的底层数据结构等重要内容进行分析。有需要的童鞋们,记得关注我,随时可以获取最新的消息。

谢谢。

    

 

分类:

技术点:

相关文章: