> 文章列表 > [Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

文章目录

      • 1.ByteBufAllocator 内存管理器
      • 2.UnpooledByteBufAllocator
        • 2.1 heap内存的分配
        • 2.2 direct内存的分配
      • 3.PooledByteBufAllocator
        • 3.1 heap内存和direct内存的分配
        • 3.2 directArena分配direct内存的流程
        • 3.3 内存规格的介绍
      • 4.缓存的相关问题
        • 4.1 缓存的数据结果
        • 4.2 命中缓存的分配流程
      • 5.PoolThreadCache的相关问题
        • 5.1 Arena数据结构分析
        • 5.2 Page级别的内存分配
        • 5.3 Subpage级别的内存分配

1.ByteBufAllocator 内存管理器

ByteBuf分类:

  • pooled和unpooled
  • heap和direct
  • unsafe和非unsafe

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

通过不同的方法去读取到数据。

    @Overridepublic ByteBuf buffer() {if (directByDefault) {return directBuffer();}return heapBuffer();}

堆内存分类

    @Overridepublic ByteBuf heapBuffer() {return heapBuffer(DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY);}@Overridepublic ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {if (initialCapacity == 0 && maxCapacity == 0) {return emptyBuf;}validate(initialCapacity, maxCapacity);return newHeapBuffer(initialCapacity, maxCapacity);}

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

  • PooledByteBufAllocator: 通过在预先分配好的内存中分配数据
  • UnpooledByteBufAllocator: 从操作系统中直接分配内存数据

2.UnpooledByteBufAllocator

  • newHeapBuffer
  • newDirectBuffer

2.1 heap内存的分配

UnpooledByteBufAllocator.newHeapBuffer()

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

判断是否是Unsafe, 如果有Unsafe的辅助, 为InstrumentedUnpooledUnsafeHeapByteBuf, 如果没有则为InstrumentedUnpooledHeapByteBuf

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

一个是UnpooledUnsafeHeapByteBuf, 一个是UnpooledHeapByteBuf, 不过UnpooledUnsafeHeapByteBuf是继承UnpooledHeapByteBuf

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

UnpooledUnsafeHeapByteBuf

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

这里的UnpooledUnsafeHeapByteBufUnpooledHeapByteBuf区别在于_getByte()以及一些类似的方法, 区别在于工具类的使用上

UnpooledUnsafeHeapByteBuf

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

UnpooledHeapByteBuf

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

一个是UnsafeByteBufUtil, 一个是HeapByteBufUtil

UnsafeByteBufUtil

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

HeapByteBufUtil

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

UnsafeByteBufUtil是通过对象加偏移量的方式获取数据的, 而HeapByteBufUtil是通过数组获取数据的。前一种的效率高一些。

2.2 direct内存的分配

UnpooledByteBufAllocator.newDirectBuffer()

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

  • InstrumentedUnpooledDirectByteBuf: UnpooledDirectByteBuf
  • InstrumentedUnpooledUnsafeDirectByteBuf: UnpooledUnsafeDirectByteBuf
  • InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf: UnpooledUnsafeNoCleanerDirectByteBuf, UnpooledUnsafeDirectByteBuf

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

UnpooledDirectByteBuf

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

UnpooledDirectByteBuf.setByteBuffer()

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

UnpooledUnsafeDirectByteBuf.setByteBuffer()

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

先是调用了子类的方法, 然后调用Unsafe拿到对应的数据。

其次是_getByte()等方法的不同, Unsafe会通过一个内存地址加偏移量获取数据, 非unsafe通过数组下标获取数据的。

3.PooledByteBufAllocator

  • newDirectBuffer
  • newHeapBuffer

3.1 heap内存和direct内存的分配

    private final PoolThreadLocalCache threadCache;@Overrideprotected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {// 获取线程局部缓存PoolThreadCachePoolThreadCache cache = threadCache.get();// 从局部缓存中获取到heap竞技场部分 PoolArena<byte[]> heapArena = cache.heapArena;final ByteBuf buf;if (heapArena != null) {// 主要的进行分配的过程buf = heapArena.allocate(cache, initialCapacity, maxCapacity);} else {buf = PlatformDependent.hasUnsafe() ?new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);}return toLeakAwareBuffer(buf);}
  • 获取线程局部缓存PoolThreadCache, 从PoolThreadLocalCache 获取到
  • 从局部缓存中获取到heap竞技场部分PoolArena
  • 从Arena上进行内存分配

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

这里的nDirectArena为2倍的cpu核数, 为的是之前创建的NioEventLoop的数量对应

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

PooledByteBufAllocator的结构, 每一个Thread和Area对应, 都是2倍Cpu核数

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

线程局部缓存PoolThreadCache

  • tinyCacheSize
  • smallCacheSize
  • normalCacheSize

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

3.2 directArena分配direct内存的流程

directArena.allocate(cache, initialCapacity, maxCapacity)

  1. 从对象池中获取PooledByteBuf进行复用
  2. 从缓存上进行内存分配
  3. 从内存堆中进行内存分配

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

  • Unsafe: PooledUnsafeDirectByteBuf
  • 非Unsafe: PooledDirectByteBuf

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

  • 轻量级对象池获取ByteBuf
  • ByteBuf复用方法

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

allocate()

分配内存逻辑

   private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {final int normCapacity = normalizeCapacity(reqCapacity);if (isTinyOrSmall(normCapacity)) { // capacity < pageSizeint tableIdx;PoolSubpage<T>[] table;boolean tiny = isTiny(normCapacity);if (tiny) { // < 512if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {// was able to allocate out of the cache so move onreturn;}tableIdx = tinyIdx(normCapacity);table = tinySubpagePools;} else {if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {// was able to allocate out of the cache so move onreturn;}tableIdx = smallIdx(normCapacity);table = smallSubpagePools;}final PoolSubpage<T> head = table[tableIdx];/* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and* {@link PoolChunk#free(long)} may modify the doubly linked list as well.*/synchronized (head) {final PoolSubpage<T> s = head.next;if (s != head) {assert s.doNotDestroy && s.elemSize == normCapacity;long handle = s.allocate();assert handle >= 0;s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);incTinySmallAllocation(tiny);return;}}synchronized (this) {allocateNormal(buf, reqCapacity, normCapacity);}incTinySmallAllocation(tiny);return;}if (normCapacity <= chunkSize) {if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {// was able to allocate out of the cache so move onreturn;}synchronized (this) {allocateNormal(buf, reqCapacity, normCapacity);++allocationsNormal;}} else {// Huge allocations are never served via the cache so just call allocateHugeallocateHuge(buf, reqCapacity);}}

3.3 内存规格的介绍

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

  1. 0 - 512B 为tinySize, 单位为SubPage
  2. 512B - 8K 为smallSize, 单位为SubPage
  3. 8K - 16M 为normalSize, 单位为Page
  4. 16M以上 为hugeSize, 单位为Chunk

4.缓存的相关问题

4.1 缓存的数据结果

MemoryRegionCache

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

  • queue: chunk handler执行逻辑
  • sizeClass: tiny, small, normal
  • size: 16B, 512B, 1K, 2K, 4K, 8K, 16K, 32K

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

  • tiny的数组大小为32: 16B - 496B
  • small的数组大小为4: 512BM 1K 2K 4K
  • normal的数组大小为3: 8K 16K 32K

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

通过计算和传参获取到数组大小和数组的单位的大小

4.2 命中缓存的分配流程

分配内存逻辑中存在名字缓存的分配流程。

   private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {final int normCapacity = normalizeCapacity(reqCapacity);if (isTinyOrSmall(normCapacity)) { // capacity < pageSizeint tableIdx;PoolSubpage<T>[] table;boolean tiny = isTiny(normCapacity);if (tiny) { // < 512// 命中缓存的分配流程if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {// was able to allocate out of the cache so move onreturn;}tableIdx = tinyIdx(normCapacity);table = tinySubpagePools;} else {// 命中缓存的分配流程if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {// was able to allocate out of the cache so move onreturn;}tableIdx = smallIdx(normCapacity);table = smallSubpagePools;}final PoolSubpage<T> head = table[tableIdx];/* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and* {@link PoolChunk#free(long)} may modify the doubly linked list as well.*/synchronized (head) {final PoolSubpage<T> s = head.next;if (s != head) {assert s.doNotDestroy && s.elemSize == normCapacity;long handle = s.allocate();assert handle >= 0;s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);incTinySmallAllocation(tiny);return;}}synchronized (this) {allocateNormal(buf, reqCapacity, normCapacity);}incTinySmallAllocation(tiny);return;}if (normCapacity <= chunkSize) {if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {// was able to allocate out of the cache so move onreturn;}synchronized (this) {allocateNormal(buf, reqCapacity, normCapacity);++allocationsNormal;}} else {// Huge allocations are never served via the cache so just call allocateHugeallocateHuge(buf, reqCapacity);}}

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

  1. 找到对应size的MemoryRegionCache
  2. 从queue中弹出一个entry给ByteBuf初始化
  3. 将弹出的entry扔到对象池进行复用

找到对应size的MemoryRegionCache

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

从queue中弹出一个entry给ByteBuf初始化

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

初始化

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

将弹出的entry扔到对象池进行复用

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

5.PoolThreadCache的相关问题

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

  1. cache: 之前的MemoryRegionCache
  2. arena: PoolArena, PoolChunkList, PoolChunk, PoolSubpage

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

5.1 Arena数据结构分析

Arena: ChunkList的双向链表 -> Chunk的双向链表

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

Chunk -> subpage[]数组
[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

5.2 Page级别的内存分配

allocateNormal()

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

  1. 尝试在现有的chunk上分配
  2. 创建一个chunk进行分配
  3. 初始化pooledByteBuf: c.allocate()

创建chunk分配 PoolChunk

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

    PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize, int offset) {unpooled = false;this.arena = arena;this.memory = memory;this.pageSize = pageSize;this.pageShifts = pageShifts;this.maxOrder = maxOrder;this.chunkSize = chunkSize;this.offset = offset;unusable = (byte) (maxOrder + 1);log2ChunkSize = log2(chunkSize);subpageOverflowMask = ~(pageSize - 1);freeBytes = chunkSize;assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;maxSubpageAllocs = 1 << maxOrder;memoryMap = new byte[maxSubpageAllocs << 1];depthMap = new byte[memoryMap.length];int memoryMapIndex = 1;for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a timeint depth = 1 << d;for (int p = 0; p < depth; ++ p) {// in each level traverse left to right and set value to the depth of subtreememoryMap[memoryMapIndex] = (byte) d;depthMap[memoryMapIndex] = (byte) d;memoryMapIndex ++;}}subpages = newSubpageArray(maxSubpageAllocs);cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);}

handle: 对应chunk中第几个page的第几个subpage, 就是拿到内存里的哪一个连续内存

初始化PooledByteBuf

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

5.3 Subpage级别的内存分配

allocateSubpage()

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)

  • 定位到Subpage对象
  • 初始化Subpage: 划分page
  • 初始化PooledByteBuf: 和page级别的内存分配一个代码