> 文章列表 > [Netty] FastThreadLocal (十四)

[Netty] FastThreadLocal (十四)

[Netty] FastThreadLocal (十四)

文章目录

      • 1.FastThreadLocal介绍
      • 2.FastThreadLocal分析
      • 3.FastThreadLocal结构分析
      • 4.FastThreadLocal方法分析
        • 4.1 FastThreadLocal.get()
        • 4.2 FastThreadLocal.set()

1.FastThreadLocal介绍

FastThreadLocal是Netty中常用的一个工具类, FastThreadLocal所使用的InternalThreadLocalMap内部不是采用哈希表, 而是直接通过数组索引的方式返回object, 省去了哈希表的查找过程, 因此效率相比于JDK的ThreadLocal更高。

2.FastThreadLocal分析

每个FastThreadLocal带有一个类型为int的index, 该属性在整个JVM中是全局唯一的, JVM中第一个实例化的FastThreadLocal的index为0, 第二个为1。

public FastThreadLocal() {index = InternalThreadLocalMap.nextVariableIndex();
}public static int nextVariableIndex() {//nextIndex是一个静态变量,每次调用nextVariableIndex()都会自增1,让后赋给FastThreadLocal的index属性int index = nextIndex.getAndIncrement();if (index < 0) {nextIndex.decrementAndGet();throw new IllegalStateException("too many thread-local indexed variables");}return index;
}static final AtomicInteger nextIndex = new AtomicInteger();

InternalThreadLocalMap可以通过FastThreadLocal的index值直接通过数据下标拿到相应的object。

public final V get(InternalThreadLocalMap threadLocalMap) {Object v = threadLocalMap.indexedVariable(index);if (v != InternalThreadLocalMap.UNSET) {return (V) v;}return initialize(threadLocalMap);
}public Object indexedVariable(int index) {Object[] lookup = indexedVariables;return index < lookup.length? lookup[index] : UNSET;
}

如果要使用FastThreadLocal, 线程应该为FastThreadLocalThread, 内部使用InternalThreadLocalMap替换了JDK的ThreadLocalMap。

public static InternalThreadLocalMap get() {Thread thread = Thread.currentThread();if (thread instanceof FastThreadLocalThread) {//如果是FastThreadLocalThread,那么可以直接获取该FastThreadLocalThread的InternalThreadLocalMapreturn fastGet((FastThreadLocalThread) thread);} else {return slowGet();}
}private static InternalThreadLocalMap slowGet() {//如果是普通的Thread,会先通过ThreadLocal找到Thread对应的InternalThreadLocalMap,该ThreadLocal是一个静态变量,在JVM中是唯一的ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;InternalThreadLocalMap ret = slowThreadLocalMap.get();if (ret == null) {ret = new InternalThreadLocalMap();slowThreadLocalMap.set(ret);}return ret;
}

FastThreadLocal中有一个特殊的index

private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();

这个值在整个JVM中是唯一且不变的, 并且该值也是通过InternalThreadLocalMap.nextVariableIndex()来取值的, 意味着这个值永远是0。
正常的FastThreadLocal的index是从1开始的, 因为InternalThreadLocalMap中index为0的object是一个特殊的object。

private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);Set<FastThreadLocal<?>> variablesToRemove;if (v == InternalThreadLocalMap.UNSET || v == null) {variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);} else {variablesToRemove = (Set<FastThreadLocal<?>>) v;}variablesToRemove.add(variable);
}

每个FastThreadLocalThread的InternalThreadLocalMap中index为0的object是一个Set<FastThreadLocal<?>>, 这个set保存了FastThreadLocalThread所用到的所有的FastThreadLocal, 如果要删除FastThreadLocalThread中的所有Object, 直接删除set即可

public static void removeAll() {//获取FastThreadLocalThread的InternalThreadLocalMapInternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();if (threadLocalMap == null) {return;}try {//获取index为variablesToRemoveIndex的object,也就是上面提到的index为0的特殊的object,他是一个SetObject v = threadLocalMap.indexedVariable(variablesToRemoveIndex);if (v != null && v != InternalThreadLocalMap.UNSET) {@SuppressWarnings("unchecked")//将object强转为SetSet<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;//获取该FastThreadLocalThread的所有的FastThreadLocalFastThreadLocal<?>[] variablesToRemoveArray =variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);for (FastThreadLocal<?> tlv: variablesToRemoveArray) {//依次调用这些FastThreadLocal的remove方法tlv.remove(threadLocalMap);}}} finally {//最后将该FastThreadLocal的InternalThreadLocalMap置为nullInternalThreadLocalMap.remove();}
}

FastThreadLocal.removeAll()方法会在DefaultThreadFactory中被调用, 通过DefaultThreadFactory这个工厂类new出来的Thread都是FastThreadLocalThread。

public void run() {try {r.run();} finally {FastThreadLocal.removeAll();}
}

每个线程在结束后都会调用FastThreadLocal.removeAll(), 这样该线程所有通过FastThreadLocal设置的Object在线程结束后都会被置为null, 避免了内存泄露。

3.FastThreadLocal结构分析

[Netty] FastThreadLocal (十四)

  1. InternalThreadLocalMap中并不是Entry的key-value结构, 而是Object数组
  2. 索引0位置存放FastThreadLocal的Set集合, 其他索引位置初始化为UNSET, 数据存入的时候更新为具体的Object
  3. FastThreadLocal中包含一个自增的index表示在InternalThreadLocalMap的数组中的索引位置
  4. Set<FastThreadLocal<?>>结构中存放FastThreadLocal的引用, 更容易解决内存泄漏的问题

4.FastThreadLocal方法分析

public class FastThreadLocalTest {private static FastThreadLocal<Object> threadLocal =new FastThreadLocal<Object>(){@Overrideprotected Object initialValue() throws Exception {return new Object();}};// 每个线程拿到的对象都是线程独享// 线程对对象的修改不会影响其他线程public static void main(String[] args) {new Thread(() -> {// 1.获取ThreadLocalMap// 2.直接通过索引取出对象// 3.初始化对象, 如果没有对象的话Object o = threadLocal.get();System.out.println(o);while (true){// 1.获取ThreadLocalMap// 2.直接通过索引set对象// 3.removethreadLocal.set(new Object());try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}).start();new Thread(() -> {Object o = threadLocal.get();System.out.println(o);while (true){System.out.println(threadLocal.get() == o);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}).start();}
}
  • get方法
  • set方法

4.1 FastThreadLocal.get()

    public final V get() {InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();Object v = threadLocalMap.indexedVariable(index);if (v != InternalThreadLocalMap.UNSET) {return (V) v;}return initialize(threadLocalMap);}public static InternalThreadLocalMap get() {Thread thread = Thread.currentThread();if (thread instanceof FastThreadLocalThread) { // 当前线程是否为 FastThreadLocalThread 类型return fastGet((FastThreadLocalThread) thread);} else {return slowGet();}
}
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {InternalThreadLocalMap threadLocalMap = thread.threadLocalMap(); // 获取 FastThreadLocalThread 的 threadLocalMap 属性if (threadLocalMap == null) {thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());}return threadLocalMap;
}
private static InternalThreadLocalMap slowGet() {ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap; InternalThreadLocalMap ret = slowThreadLocalMap.get(); // 从 JDK 原生 ThreadLocal 中获取 InternalThreadLocalMapif (ret == null) {ret = new InternalThreadLocalMap();slowThreadLocalMap.set(ret);}return ret;
}
  1. 获取ThreadLocalMap

    • FastThreadLocalThread: fastGet() 方法获取 FastThreadLocalThread 的threadLocalMap 属性。
    • ThreadLocal: slowGet() 方法获取 InternalThreadLocalMap 就退化成 JDK 原生的 ThreadLocal 获取。
  2. 直接通过索引取出对象
    [Netty] FastThreadLocal (十四)

  3. 初始化对象, 如果没有对象的话
    [Netty] FastThreadLocal (十四)

4.2 FastThreadLocal.set()

public final void set(V value) {if (value != InternalThreadLocalMap.UNSET) { // 1\\. value 是否为缺省值InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); // 2\\. 获取当前线程的 InternalThreadLocalMapsetKnownNotUnset(threadLocalMap, value); // 3\\. 将 InternalThreadLocalMap 中数据替换为新的 value} else {remove();}
}
  1. 判断 value 是否为缺省值
  2. 获取当前线程的InternalThreadLocalMap
  3. 将 InternalThreadLocalMap 中对应数据替换为新的 value
  4. remove

setKnownNotUnset(): 将数据添加到 InternalThreadLocalMap

    private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {if (threadLocalMap.setIndexedVariable(index, value)) {addToVariablesToRemove(threadLocalMap, this);}}public boolean setIndexedVariable(int index, Object value) {Object[] lookup = indexedVariables;if (index < lookup.length) {Object oldValue = lookup[index];lookup[index] = value;return oldValue == UNSET;} else {// 扩容expandIndexedVariableTableAndSet(index, value);return true;}}private void expandIndexedVariableTableAndSet(int index, Object value) {Object[] oldArray = indexedVariables;final int oldCapacity = oldArray.length;int newCapacity = index;newCapacity |= newCapacity >>>  1;newCapacity |= newCapacity >>>  2;newCapacity |= newCapacity >>>  4;newCapacity |= newCapacity >>>  8;newCapacity |= newCapacity >>> 16;newCapacity ++;Object[] newArray = Arrays.copyOf(oldArray, newCapacity);Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);newArray[index] = value;indexedVariables = newArray;}private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); // 获取数组下标为 0 的元素Set<FastThreadLocal<?>> variablesToRemove;if (v == InternalThreadLocalMap.UNSET || v == null) {variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>()); // 创建 FastThreadLocal 类型的 Set 集合threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); // 将 Set 集合填充到数组下标 0 的位置} else {variablesToRemove = (Set<FastThreadLocal<?>>) v; // 如果不是 UNSET,Set 集合已存在,直接强转获得 Set 集合}variablesToRemove.add(variable); // 将 FastThreadLocal 添加到 Set 集合中
}
  1. 找到数组下标 index 位置

    1. 如果数组容量大于 FastThreadLocal 的 index 索引, 直接找到数组下标 index 位置将新 value 设置进去
    2. 在设置新的 value 之前, 将index 位置的元素取出, 如果旧元素还是UNSET缺省对象, 返回成功
  2. expandIndexedVariableTableAndSet(): 扩容

  3. addToVariablesToRemove(): 向 InternalThreadLocalMap 添加完数据之后, 将 FastThreadLocal 对象保存到待清理的 Set 中

remove():

public final void remove() {remove(InternalThreadLocalMap.getIfSet());
}
public static InternalThreadLocalMap getIfSet() {Thread thread = Thread.currentThread();if (thread instanceof FastThreadLocalThread) {return ((FastThreadLocalThread) thread).threadLocalMap();}return slowThreadLocalMap.get();
}
public final void remove(InternalThreadLocalMap threadLocalMap) {if (threadLocalMap == null) {return;}Object v = threadLocalMap.removeIndexedVariable(index); // 删除数组下标 index 位置对应的 valueremoveFromVariablesToRemove(threadLocalMap, this); // 从数组下标 0 的位置取出 Set 集合,并删除当前 FastThreadLocalif (v != InternalThreadLocalMap.UNSET) {try {onRemoval((V) v); // 空方法,用户可以继承实现} catch (Exception e) {PlatformDependent.throwException(e);}}
}
  1. 调用 InternalThreadLocalMap.getIfSet() 获取当前 InternalThreadLocalMap
    • 如果是 FastThreadLocalThread 类型, 直接取 FastThreadLocalThread 中 threadLocalMap 属性
    • 如果是普通Thread, 从 ThreadLocal 类型的 slowThreadLocalMap 中获取
  2. InternalThreadLocalMap 会从数组中定位到下标 index 位置的元素, 覆盖为缺省对象 UNSET
  3. 清理当前的 FastThreadLocal 对象, InternalThreadLocalMap 会取出数组下标 0 位置的 Set 集合, 删除当前FastThreadLocal