> 文章列表 > RecvByteBufAllocator内存分配计算

RecvByteBufAllocator内存分配计算

RecvByteBufAllocator内存分配计算

  虽然了解了整个内存池管理的细节,包括它的内存分配的具体逻辑,但是每次从NioSocketChannel中读取数据时,应该分配多少内存去读呢? 例如,客户端发送的数据为1KB , 应该分配多少内存去读呢? 例如: 客户端发送的数据为1KB , 若每次都分配8KB的内存去读取数据,则会导致内存大量浪费,若分配16B的内存去读取数据,那么需要64次才能全部读完, 对性能的有很大的影响 , 那么对于 这个问题,Netty是如何解决的呢?

  NioEventLoop线程在处理OP_READ事件,进入NioByteUnsafe循环读取数据时,使用了两个类来处理内存的分配,一个是ByteBufAllocator, PooledByteBufAllocator为它的默认实现类, 另一个是RecvByteBufAllocator,AdaptiveRecvByteBufAllocator是它的默认实现类,在DefaultChannelConfig初始化时设置 , PooledByteBufAllocator主要用来处理内存分配,并最终委托PoolArena去完成,AdaptiveRecvByteBufAllocator主要用来计算每次读循环时应该分配多少内存,NioByteUnsafe之所有需要循环读取,主要是因为分配的初始ByteBuf不一定能够容纳读取到的所有数据,NioByteUnsafe循环读取的核心代码解读如下 :

public final void read() {// 获取pipeline通道配置,Channel管道final ChannelConfig config = config();// socketChannel已经关闭if (shouldBreakReadReady(config)) {clearReadPending();return;}final ChannelPipeline pipeline = pipeline();// 获取内存分配器,默认为PooledByteBufAllocatorfinal ByteBufAllocator allocator = config.getAllocator();// 获取RecvByteBufAllocator内部的计算器Handlefinal RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();// 清空上一次读取的字节数,每次读取时均重新计算// 字节buf分配器, 并计算字节buf分配器HandlerallocHandle.reset(config);ByteBuf byteBuf = null;boolean close = false;try {//当对端发送一个超大的数据包时,TCP会拆包。//        OP_READ事件只会触发一次,Netty需要循环读,默认最多读16次,因此ChannelRead()可能会触发多次,拿到的是半包数据。//        如果16次没把数据读完,没有关系,下次select()还会继续处理。//        对于Selector的可读事件,如果你没有读完数据,它会一直返回。do {// 分配内存 ,allocator根据计算器Handle计算此次需要分配多少内存并从内存池中分配//  分配一个ByteBuf,大小能容纳可读数据,又不过于浪费空间。byteBuf = allocHandle.allocate(allocator);// 读取通道接收缓冲区的数据 , 设置最后一次分配内存大小加上每次读取的字节数// doReadBytes(byteBuf):ByteBuf内部有ByteBuffer,底层还是调用了SocketChannel.read(ByteBuffer)// allocHandle.lastBytesRead()根据读取到的实际字节数,自适应调整下次分配的缓冲区大小。allocHandle.lastBytesRead(doReadBytes(byteBuf));if (allocHandle.lastBytesRead() <= 0) {// nothing was read. release the buffer.// 若没有数据可读,则释放内存byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0;if (close) {// There is nothing left to read as we received an EOF.// 当读到-1时, 表示Channel 通道已经关闭// 没有必要再继续readPending = false;}break;}// 更新读取消息计数器, 递增已经读取的消息数量allocHandle.incMessagesRead(1);readPending = false;// 通知通道处理读取数据,触发Channel管道的fireChannelRead事件pipeline.fireChannelRead(byteBuf);byteBuf = null;} while (allocHandle.continueReading());// 读取操作完毕 ,读结束后调用,记录此次实际读取到的数据大小,并预测下一次内存分配大小allocHandle.readComplete();// 触发Channel管道的fireChannelReadComplete事件pipeline.fireChannelReadComplete();if (close) {// 如果Socket通道关闭,则关闭读操作closeOnRead(pipeline);}} catch (Throwable t) {// 处理读取异常handleReadException(pipeline, byteBuf, t, close, allocHandle);} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {// 若操作完毕,且没有配置自动读// 则从选择Key兴趣集中移除读操作事件removeReadOp();}}}
}

  每一次创建byteBuf分配内存大小是多大呢? 这个由allocate()方法内部的guess()方法来决定 。

public ByteBuf allocate(ByteBufAllocator alloc) {return alloc.ioBuffer(guess());
}

  如果是第一次 调用guess()方法,默认分配1024B的内存空间 ,后面分配内存大小动态调节 。

// 实现doReadBytes()方法,从SocketChannel中读取数据。
protected int doReadBytes(ByteBuf byteBuf) throws Exception {// 获取计算内存分配器Handlefinal RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();// 设置尝试读取字节数组的buf的可写字节数allocHandle.attemptedBytesRead(byteBuf.writableBytes());// 从Channel中读取字节并写入到buf中,返回读取的字节数return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

  在这里,我们需要明白byteBuf.writableBytes()这个方法,writableBytes()方法的返回值为byteBuf中可写的字节数,内部计算方法用byteBuf的容量- byteBuf的写索引得出,而byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());这一行代码,实际上就是将Channel中的数据写入到byteBuf中,返回值为实际写入到ByteBuf中的字节数。
  RecvByteBufAllocator的默认实现类AdaptiveRecvByteBufAllocator是实际的缓冲管理区,这个类可以根据读取到的数据预测所需要的字节的多少,从而自动增加或减少,如果上一次读循环将缓冲区的写满了,那么预测的字节数会变大,如果连续两次循环都不能填满已经分配的缓冲区,则预测字节数会变小。

public void lastBytesRead(int bytes) {// If we read as much as we asked for we should check if we need to ramp up the size of our next guess.// This helps adjust more quickly when large amounts of data is pending and can avoid going back to// the selector to check for more data. Going back to the selector can add significant latency for large// data transfers.//  如果上 一次读循环将缓冲区填充满了,那么预测的字节数会变大if (bytes == attemptedBytesRead()) {// 如果此次读取将缓冲区填充满了,增加一次记录的机会record(bytes);}super.lastBytesRead(bytes);
}// 该方法的参数是一次读取操作中实际读取到的数据大小,将其与nextReceiveBufferSize 进行比较,如果实际字节数actualReadBytes大于等于该值,则立即更新nextReceiveBufferSize ,
// 其更新后的值与INDEX_INCREMENT有关。INDEX_INCREMENT为默认常量,值为4。也就是说在扩容时会一次性增大多一些,以保证下次有足够空间可以接收数据。而相对扩容的策略,
// 缩容策略则实际保守些,常量为INDEX_INCREMENT,值为1,同样也是进行对比, 但不同的是,若实际字节小于所用nextReceiveBufferSize,并不会立马进行大小调整,
// 而是先把 decreaseNow 设置为true,如果下次仍然小于,则才会减少nextReceiveBufferSize的大小
private void record(int actualReadBytes) {// 如果小了两个数量级,则需要缩容if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {if (decreaseNow) {                      // 若减少标识decreaseNow连续两次为true, 则说明下次读取字节数需要减少SIZE_TABLE下标减1index = max(index - INDEX_DECREMENT, minIndex);nextReceiveBufferSize = SIZE_TABLE[index];decreaseNow = false;} else {decreaseNow = true;                     // 第一次减少,只做记录}} else if (actualReadBytes >= nextReceiveBufferSize) {                // 实际读取的字节大小要大于或等于预测值index = min(index + INDEX_INCREMENT, maxIndex);             // SIZE_TABLE 下标 + 4nextReceiveBufferSize = SIZE_TABLE[index];      // 若当前缓存为512,则变成 512 * 2 ^ 4decreaseNow = false;}
}public void lastBytesRead(int bytes) {// 设置最后读取的字节数lastBytesRead = bytes;if (bytes > 0) {// 总读取的字节数totalBytesRead += bytes;}
}

  上述过程中,SIZE_TABLE是什么呢? 请看AdaptiveRecvByteBufAllocator源码实现。

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {static final int DEFAULT_MINIMUM = 64;                      // 接收缓冲区的最小长度下限static final int DEFAULT_INITIAL = 1024;                    // 接收缓冲区的最大长度上限static final int DEFAULT_MAXIMUM = 65536;                   // 接收缓冲区最大长度上限// 在调整缓冲区大小时,若是增加缓冲区容量,那么增加的索引值。// 比如,当前缓冲区的大小为SIZE_TABLE[20],若预测下次需要创建的缓冲区需要增加容量大小,// 则新缓冲区的大小为SIZE_TABLE[20 + INDEX_INCREMENT],即SIZE_TABLE[24]private static final int INDEX_INCREMENT = 4;               // 扩容增长量// 在调整缓冲区大小时,若是减少缓冲区容量,那么减少的索引值。// 比如,当前缓冲区的大小为SIZE_TABLE[20],若预测下次需要创建的缓冲区需要减小容量大小,// 则新缓冲区的大小为SIZE_TABLE[20 - INDEX_DECREMENT],即SIZE_TABLE[19]private static final int INDEX_DECREMENT = 1;               // 扩容减少量private static final int[] SIZE_TABLE;// 分配了一个int类型的数组,并进行了数组的初始化处理, 从实现来看,该数组的长度是53,前32位是16的倍数,value值是从16开始的,到512,从33位开始,值是前一位的// 两倍,即从1024,2048 , 到最大值 1073741824 。static {List<Integer> sizeTable = new ArrayList<Integer>();for (int i = 16; i < 512; i += 16) {sizeTable.add(i);}for (int i = 512; i > 0; i <<= 1) {sizeTable.add(i);}SIZE_TABLE = new int[sizeTable.size()];for (int i = 0; i < SIZE_TABLE.length; i++) {SIZE_TABLE[i] = sizeTable.get(i);}System.out.println("================");}/*** @deprecated There is state for {@link #maxMessagesPerRead()} which is typically based upon channel type.*/public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();// 入参是一个大小,然后利用二分查找法对该数组进行size定位 ,目标是为了找出该size值在数组中的下标位置 , 主要是为了初始化maxIndex, maxIndex这两个参数private static int getSizeTableIndex(final int size) {for (int low = 0, high = SIZE_TABLE.length - 1; ; ) {if (high < low) {return low;}if (high == low) {return high;}int mid = low + high >>> 1;int a = SIZE_TABLE[mid];int b = SIZE_TABLE[mid + 1];if (size > b) {low = mid + 1;} else if (size < a) {high = mid - 1;} else if (size == a) {return mid;} else {return mid + 1;}}}private final int minIndex;private final int maxIndex;private final int initial;/*** Creates a new predictor with the default parameters.  With the default* parameters, the expected buffer size starts from {@code 1024}, does not* go down below {@code 64}, and does not go up above {@code 65536}.*/public AdaptiveRecvByteBufAllocator() {this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);}/*** Creates a new predictor with the specified parameters.* @param minimum the inclusive lower bound of the expected buffer size* @param initial the initial buffer size when no feed back was received* @param maximum the inclusive upper bound of the expected buffer size*/public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {checkPositive(minimum, "minimum");if (initial < minimum) {throw new IllegalArgumentException("initial: " + initial);}if (maximum < initial) {throw new IllegalArgumentException("maximum: " + maximum);}int minIndex = getSizeTableIndex(minimum);if (SIZE_TABLE[minIndex] < minimum) {this.minIndex = minIndex + 1;} else {this.minIndex = minIndex;}int maxIndex = getSizeTableIndex(maximum);if (SIZE_TABLE[maxIndex] > maximum) {this.maxIndex = maxIndex - 1;} else {this.maxIndex = maxIndex;}this.initial = initial;}@Overridepublic Handle newHandle() {return new HandleImpl(minIndex, maxIndex, initial);}@Overridepublic AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {super.respectMaybeMoreData(respectMaybeMoreData);return this;}}

  SIZE_TABLE由上述加粗代码进行初始化 。 AdaptiveRecvByteBufAllocator内部维护了一个SIZE_TABLE数组,记录了不同的内存的内存块大小,按照分配需要寻找最合适的内存块,SIZE_TABLE数组中的值为2^n,这样便于软硬件进行处理,SIZE_TABLE数组的初始化与PoolArena中的normalizeCapacity的初始化类似,当需要的内存很小时 , 增长的幅度不大, 当需要的内存较大时, 增长的幅度比较大,因此在[16,512]区间每次增加16,直到512,而从512起,每次翻一倍, 直到int的最大值 。 那size的具体大小值是什么呢?
SIZE_TABLE 数组的toString()打印如下 :
[16B, 32B, 48B, 64B, 80B, 96B, 112B, 128B, 144B, 160B, 176B, 192B, 208B, 224B, 240B, 256B, 272B, 288B, 304B, 320B, 336B, 352B, 368B, 384B, 400B, 416B, 432B, 448B, 464B, 480B, 496B, 512B, 1k, 2k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1M, 2M, 4M, 8M, 16M, 32M, 64M, 128M, 256M, 512M, 1G]

  当对内部计算器Handle的具体实现类HandleImpl进行初始化时,可根据AdaptiveRecvByteBufAllocator的getSizeTableIndex()二分查找方法获取SIZE_TABLE的下标index并保存,通过SIZE_TABLE[index]获取下次需要分配的缓冲区大小nextReceiveBufferSize并记录,缓冲区的最小容量属性对SIZE_TABLE中的下标为minIndex的值 , 最大容量属性对应的SIZE_TABLE中的下标为maxIndex的值及bool类型标识属性decreaseNow ,这三个属性用于判断下一次创建缓冲区是否需要减少 。
  NioByteUnsafe每次循环完成后会根据实际读取到的字节数和当前缓冲区的大小重新设置下次需要分配的缓冲区的大小。 具体代码如下 。

// 循环读取完后被调用
public void readComplete() {record(totalBytesRead());
}//返回已经读取的字节个数,若‘totalBytesRead < 0’则说明已经读取的字节数已经操作了’Integer.MAX_VALUE’,则返回Integer.MAX_VALUE;否则返回真实的已经读取的字节数。
protected final int totalBytesRead() {return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
}

  可以模拟NioByteUnsafe的read()方法,在每次循环开始时, 一定要先重置totalMessages与totalByteRead(清零),读取完成后, readComplete会计算并调整下次预计需要分配的缓冲区的大小, 具体代码如下

public static void main(String[] args) throws Exception {AdaptiveRecvByteBufAllocator allocator = new AdaptiveRecvByteBufAllocator();RecvByteBufAllocator.Handle handle = allocator.newHandle();System.out.println("==============开始 I/O 读事件模拟==============");// 读取循环开始前先重置,将读取的次数和字节数设置为0, 将totalMessages与totalBytesRead设置为0handle.reset(null);System.out.println(String.format("第一次模拟读,需要分配大小 :%d", handle.guess()));handle.lastBytesRead(256);// 调整下次预测值handle.readComplete();// 在每次读取数据时都需要重置totalMessage 与totalBytesReadhandle.reset(null);System.out.println(String.format("第2次花枝招展读,需要分配大小:%d ", handle.guess()));handle.lastBytesRead(256);handle.readComplete();System.out.println("===============连续2次读取的字节数小于默认分配的字节数= =========================");handle.reset(null);System.out.println(String.format("第3次模拟读,需要分配大小 : %d", handle.guess()));handle.lastBytesRead(512);// 调整下次预测值,预测值应该增加到512 * 2 ^ 4handle.readComplete();System.out.println("==================读取的字节数变大 ===============");handle.reset(null);// 读循环中缓冲区的大小System.out.println(String.format("第4次模拟读,需要分配的大小为:%d ", handle.guess()));
}

  结果输出
RecvByteBufAllocator内存分配计算

  当然啦,如果觉得自己已经很明白了,可以看看下面这个例子。

public class Test2 {public static void main(String[] args) {AdaptiveRecvByteBufAllocator allocator = new AdaptiveRecvByteBufAllocator();RecvByteBufAllocator.Handle handle = allocator.newHandle();System.out.println("==============开始 I/O 读事件模拟==============");// 读取循环开始前先重置,将读取的次数和字节数设置为0, 将totalMessages与totalBytesRead设置为0handle.reset(null);System.out.println(String.format("第一次模拟读,需要分配大小 :%d", handle.guess()));handle.lastBytesRead(512);// 调整下次预测值handle.readComplete();// 在每次读取数据时都需要重置totalMessage 与totalBytesReadhandle.reset(null);System.out.println(String.format("第2次花枝招展读,需要分配大小:%d ", handle.guess()));handle.lastBytesRead(512);handle.readComplete();System.out.println("===============连续2次读取的字节数小于默认分配的字节数= =========================");handle.reset(null);System.out.println(String.format("第3次模拟读,需要分配大小 : %d", handle.guess()));}
}

RecvByteBufAllocator内存分配计算
  最后一次结果输出为1024,并没有缩容,源码读到这里,我相信对输出结果已经没有什么意外了。到这里就告一段落,下一篇博客见。

本文对应github地址为
https://gitee.com/quyixiao/netty-netty-4.1.38.Final.git