[Netty源码] ByteBufAllocator内存管理器相关问题 (十一)
文章目录
-
-
- 1.ByteBufAllocator 内存管理器
- 2.UnpooledByteBufAllocator
-
- 2.1 heap内存的分配
- 2.2 direct内存的分配
- 3.PooledByteBufAllocator
-
- 3.1 heap内存和direct内存的分配
- 3.2 directArena分配direct内存的流程
- 3.3 内存规格的介绍
- 4.缓存的相关问题
-
- 4.1 缓存的数据结果
- 4.2 命中缓存的分配流程
- 5.PoolThreadCache的相关问题
-
- 5.1 Arena数据结构分析
- 5.2 Page级别的内存分配
- 5.3 Subpage级别的内存分配
-
1.ByteBufAllocator 内存管理器
ByteBuf分类:
- pooled和unpooled
- heap和direct
- unsafe和非unsafe
通过不同的方法去读取到数据。
@Overridepublic ByteBuf buffer() {if (directByDefault) {return directBuffer();}return heapBuffer();}
堆内存分类
@Overridepublic ByteBuf heapBuffer() {return heapBuffer(DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY);}@Overridepublic ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {if (initialCapacity == 0 && maxCapacity == 0) {return emptyBuf;}validate(initialCapacity, maxCapacity);return newHeapBuffer(initialCapacity, maxCapacity);}
- PooledByteBufAllocator: 通过在预先分配好的内存中分配数据
- UnpooledByteBufAllocator: 从操作系统中直接分配内存数据
2.UnpooledByteBufAllocator
- newHeapBuffer
- newDirectBuffer
2.1 heap内存的分配
UnpooledByteBufAllocator.newHeapBuffer()
判断是否是Unsafe, 如果有Unsafe的辅助, 为InstrumentedUnpooledUnsafeHeapByteBuf
, 如果没有则为InstrumentedUnpooledHeapByteBuf
一个是UnpooledUnsafeHeapByteBuf
, 一个是UnpooledHeapByteBuf
, 不过UnpooledUnsafeHeapByteBuf
是继承UnpooledHeapByteBuf
的
UnpooledUnsafeHeapByteBuf
这里的UnpooledUnsafeHeapByteBuf
和UnpooledHeapByteBuf
区别在于_getByte()以及一些类似的方法, 区别在于工具类的使用上
UnpooledUnsafeHeapByteBuf
UnpooledHeapByteBuf
一个是UnsafeByteBufUtil
, 一个是HeapByteBufUtil
UnsafeByteBufUtil
HeapByteBufUtil
UnsafeByteBufUtil是通过对象加偏移量的方式获取数据的, 而HeapByteBufUtil是通过数组获取数据的。前一种的效率高一些。
2.2 direct内存的分配
UnpooledByteBufAllocator.newDirectBuffer()
- InstrumentedUnpooledDirectByteBuf: UnpooledDirectByteBuf
- InstrumentedUnpooledUnsafeDirectByteBuf: UnpooledUnsafeDirectByteBuf
- InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf: UnpooledUnsafeNoCleanerDirectByteBuf, UnpooledUnsafeDirectByteBuf
UnpooledDirectByteBuf
UnpooledDirectByteBuf.setByteBuffer()
UnpooledUnsafeDirectByteBuf.setByteBuffer()
先是调用了子类的方法, 然后调用Unsafe拿到对应的数据。
其次是_getByte()等方法的不同, Unsafe会通过一个内存地址加偏移量获取数据, 非unsafe通过数组下标获取数据的。
3.PooledByteBufAllocator
- newDirectBuffer
- newHeapBuffer
3.1 heap内存和direct内存的分配
private final PoolThreadLocalCache threadCache;@Overrideprotected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {// 获取线程局部缓存PoolThreadCachePoolThreadCache cache = threadCache.get();// 从局部缓存中获取到heap竞技场部分 PoolArena<byte[]> heapArena = cache.heapArena;final ByteBuf buf;if (heapArena != null) {// 主要的进行分配的过程buf = heapArena.allocate(cache, initialCapacity, maxCapacity);} else {buf = PlatformDependent.hasUnsafe() ?new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);}return toLeakAwareBuffer(buf);}
- 获取线程局部缓存PoolThreadCache, 从PoolThreadLocalCache 获取到
- 从局部缓存中获取到heap竞技场部分PoolArena
- 从Arena上进行内存分配
这里的nDirectArena为2倍的cpu核数, 为的是之前创建的NioEventLoop的数量对应
PooledByteBufAllocator的结构, 每一个Thread和Area对应, 都是2倍Cpu核数
线程局部缓存PoolThreadCache
- tinyCacheSize
- smallCacheSize
- normalCacheSize
3.2 directArena分配direct内存的流程
directArena.allocate(cache, initialCapacity, maxCapacity)
- 从对象池中获取PooledByteBuf进行复用
- 从缓存上进行内存分配
- 从内存堆中进行内存分配
- Unsafe: PooledUnsafeDirectByteBuf
- 非Unsafe: PooledDirectByteBuf
- 轻量级对象池获取ByteBuf
- ByteBuf复用方法
allocate()
分配内存逻辑
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {final int normCapacity = normalizeCapacity(reqCapacity);if (isTinyOrSmall(normCapacity)) { // capacity < pageSizeint tableIdx;PoolSubpage<T>[] table;boolean tiny = isTiny(normCapacity);if (tiny) { // < 512if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {// was able to allocate out of the cache so move onreturn;}tableIdx = tinyIdx(normCapacity);table = tinySubpagePools;} else {if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {// was able to allocate out of the cache so move onreturn;}tableIdx = smallIdx(normCapacity);table = smallSubpagePools;}final PoolSubpage<T> head = table[tableIdx];/* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and* {@link PoolChunk#free(long)} may modify the doubly linked list as well.*/synchronized (head) {final PoolSubpage<T> s = head.next;if (s != head) {assert s.doNotDestroy && s.elemSize == normCapacity;long handle = s.allocate();assert handle >= 0;s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);incTinySmallAllocation(tiny);return;}}synchronized (this) {allocateNormal(buf, reqCapacity, normCapacity);}incTinySmallAllocation(tiny);return;}if (normCapacity <= chunkSize) {if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {// was able to allocate out of the cache so move onreturn;}synchronized (this) {allocateNormal(buf, reqCapacity, normCapacity);++allocationsNormal;}} else {// Huge allocations are never served via the cache so just call allocateHugeallocateHuge(buf, reqCapacity);}}
3.3 内存规格的介绍
- 0 - 512B 为tinySize, 单位为SubPage
- 512B - 8K 为smallSize, 单位为SubPage
- 8K - 16M 为normalSize, 单位为Page
- 16M以上 为hugeSize, 单位为Chunk
4.缓存的相关问题
4.1 缓存的数据结果
MemoryRegionCache
- queue: chunk handler执行逻辑
- sizeClass: tiny, small, normal
- size: 16B, 512B, 1K, 2K, 4K, 8K, 16K, 32K
- tiny的数组大小为32: 16B - 496B
- small的数组大小为4: 512BM 1K 2K 4K
- normal的数组大小为3: 8K 16K 32K
通过计算和传参获取到数组大小和数组的单位的大小
4.2 命中缓存的分配流程
分配内存逻辑中存在名字缓存的分配流程。
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {final int normCapacity = normalizeCapacity(reqCapacity);if (isTinyOrSmall(normCapacity)) { // capacity < pageSizeint tableIdx;PoolSubpage<T>[] table;boolean tiny = isTiny(normCapacity);if (tiny) { // < 512// 命中缓存的分配流程if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {// was able to allocate out of the cache so move onreturn;}tableIdx = tinyIdx(normCapacity);table = tinySubpagePools;} else {// 命中缓存的分配流程if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {// was able to allocate out of the cache so move onreturn;}tableIdx = smallIdx(normCapacity);table = smallSubpagePools;}final PoolSubpage<T> head = table[tableIdx];/* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and* {@link PoolChunk#free(long)} may modify the doubly linked list as well.*/synchronized (head) {final PoolSubpage<T> s = head.next;if (s != head) {assert s.doNotDestroy && s.elemSize == normCapacity;long handle = s.allocate();assert handle >= 0;s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);incTinySmallAllocation(tiny);return;}}synchronized (this) {allocateNormal(buf, reqCapacity, normCapacity);}incTinySmallAllocation(tiny);return;}if (normCapacity <= chunkSize) {if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {// was able to allocate out of the cache so move onreturn;}synchronized (this) {allocateNormal(buf, reqCapacity, normCapacity);++allocationsNormal;}} else {// Huge allocations are never served via the cache so just call allocateHugeallocateHuge(buf, reqCapacity);}}
- 找到对应size的MemoryRegionCache
- 从queue中弹出一个entry给ByteBuf初始化
- 将弹出的entry扔到对象池进行复用
找到对应size的MemoryRegionCache
从queue中弹出一个entry给ByteBuf初始化
初始化
将弹出的entry扔到对象池进行复用
5.PoolThreadCache的相关问题
- cache: 之前的MemoryRegionCache
- arena: PoolArena, PoolChunkList, PoolChunk, PoolSubpage
5.1 Arena数据结构分析
Arena: ChunkList的双向链表 -> Chunk的双向链表
Chunk -> subpage[]数组
5.2 Page级别的内存分配
allocateNormal()
- 尝试在现有的chunk上分配
- 创建一个chunk进行分配
- 初始化pooledByteBuf: c.allocate()
创建chunk分配 PoolChunk
PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize, int offset) {unpooled = false;this.arena = arena;this.memory = memory;this.pageSize = pageSize;this.pageShifts = pageShifts;this.maxOrder = maxOrder;this.chunkSize = chunkSize;this.offset = offset;unusable = (byte) (maxOrder + 1);log2ChunkSize = log2(chunkSize);subpageOverflowMask = ~(pageSize - 1);freeBytes = chunkSize;assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;maxSubpageAllocs = 1 << maxOrder;memoryMap = new byte[maxSubpageAllocs << 1];depthMap = new byte[memoryMap.length];int memoryMapIndex = 1;for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a timeint depth = 1 << d;for (int p = 0; p < depth; ++ p) {// in each level traverse left to right and set value to the depth of subtreememoryMap[memoryMapIndex] = (byte) d;depthMap[memoryMapIndex] = (byte) d;memoryMapIndex ++;}}subpages = newSubpageArray(maxSubpageAllocs);cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);}
handle: 对应chunk中第几个page的第几个subpage, 就是拿到内存里的哪一个连续内存
初始化PooledByteBuf
5.3 Subpage级别的内存分配
allocateSubpage()
- 定位到Subpage对象
- 初始化Subpage: 划分page
- 初始化PooledByteBuf: 和page级别的内存分配一个代码