Netty - FastThreadLocal

最后更新:2019-01-11

在java线程中,每个线程都有一个ThreadLocalMap实例变量(如果不使用ThreadLocal,不会创建这个Map,一个线程第一次访问某个ThreadLocal变量时,才会创建)。该Map是使用线性探测的方式解决hash冲突的问题,如果没有找到空闲的slot,就不断往后尝试,直到找到一个空闲的位置,插入entry,这种方式在经常遇到hash冲突时,影响效率。

FastThreadLocal直接使用数组避免了hash冲突的发生,具体做法是:每一个FastThreadLocal实例创建时,分配一个下标index;分配index使用AtomicInteger实现,每个FastThreadLocal都能获取到一个不重复的下标。当调用ftl.get()方法获取值时,直接从数组获取返回,如return array[index],如下图:

Netty 为 FastThreadLocal 量身打造了 FastThreadLocalThread 和 InternalThreadLocalMap 两个重要的类。下面我们看下这两个类是如何实现的。

FastThreadLocalThread 是对 Thread 类的一层包装,每个线程对应一个 InternalThreadLocalMap 实例。只有 FastThreadLocal 和 FastThreadLocalThread 组合使用时,才能发挥 FastThreadLocal 的性能优势。首先看下 FastThreadLocalThread 的源码定义:

public class FastThreadLocalThread extends Thread {
    // This will be set to true if we have a chance to wrap the Runnable.
    private final boolean cleanupFastThreadLocals;

    private InternalThreadLocalMap threadLocalMap;
    ...
}

可以看出 FastThreadLocalThread 主要扩展了 InternalThreadLocalMap 字段,FastThreadLocalThread 主要使用 InternalThreadLocalMap 存储数据,而不再是使用 Thread 中的 ThreadLocalMap

接下来看下 InternalThreadLocalMap 的内部构造。

public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(InternalThreadLocalMap.class);
    private static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap =
            new ThreadLocal<InternalThreadLocalMap>();
    private static final AtomicInteger nextIndex = new AtomicInteger();

    public static final Object UNSET = new Object();

    /** Used by {@link FastThreadLocal} */
    private Object[] indexedVariables;
    
    /**
     * 用于标识FastThreadLocal变量是否注册了cleaner
     * BitSet简要原理:
     * BitSet默认底层数据结构是一个long[]数组,开始时长度为1,即只有long[0],而一个long有64bit。
     * 当BitSet.set(1)的时候,表示将long[0]的第二位设置为true,即0000 0000 ... 0010(64bit),则long[0]==2
     * 当BitSet.get(1)的时候,第二位为1,则表示true;如果是0,则表示false
     * 当BitSet.set(64)的时候,表示设置第65位,此时long[0]已经不够用了,扩容处long[1]来,进行存储
     *
     * 存储类似 {index:boolean} 键值对,用于防止一个FastThreadLocal多次启动清理线程
     * 将index位置的bit设为true,表示该InternalThreadLocalMap中对该FastThreadLocal已经启动了清理线程
     */
    private BitSet cleanerFlags;
}

从 InternalThreadLocalMap 内部实现来看,与 ThreadLocalMap 一样都是采用数组的存储方式。但是 InternalThreadLocalMap 并没有使用线性探测法来解决 Hash 冲突,而是在 FastThreadLocal 初始化的时候分配一个数组索引 index,index 的值采用原子类 AtomicInteger 保证顺序递增,通过调用 InternalThreadLocalMap.nextVariableIndex() 方法获得。然后在读写数据的时候通过数组下标 index 直接定位到 FastThreadLocal 的位置,时间复杂度为 O(1)。如果数组下标递增到非常大,那么数组也会比较大,所以 FastThreadLocal 是通过空间换时间的思想提升读写性能。

private final int index;

public FastThreadLocal() {
    index = InternalThreadLocalMap.nextVariableIndex();
}

InternalThreadLocalMap.nextVariableIndex();的实现

private static final AtomicInteger nextIndex = new AtomicInteger();

public static int nextVariableIndex() {
    int index = nextIndex.getAndIncrement();
    if (index < 0) {
        nextIndex.decrementAndGet();
        throw new IllegalStateException("too many thread-local indexed variables");
    }
    return index;
}

每个FastThreadLocal实例以步长为1的递增序列,获取index值,这保证了InternalThreadLocalMap中数组的长度不会突增。

下面我们来看一下FastThreadLocal的get方法

public final V get() {
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    Object v = threadLocalMap.indexedVariable(index);
    // 如果获取到的值不是UNSET,那么是个有效的值,直接返回。如果是UNSET,则初始化。
    if (v != InternalThreadLocalMap.UNSET) {
        return (V) v;
    }

    return initialize(threadLocalMap);
}
public static InternalThreadLocalMap get() {
    Thread thread = Thread.currentThread();
    // 当前线程是否为 FastThreadLocalThread 类型
    if (thread instanceof FastThreadLocalThread) {
        return fastGet((FastThreadLocalThread) thread);
    } else {
        return slowGet();
    }
}
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
 	// 获取 FastThreadLocalThread 的 threadLocalMap 属性
    InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
    if (threadLocalMap == null) {
        thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
    }
    return threadLocalMap;
}
private static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap =
            new ThreadLocal<InternalThreadLocalMap>();
            
private static InternalThreadLocalMap slowGet() {
	// 从 JDK 原生 ThreadLocal 中获取 InternalThreadLocalMap
    InternalThreadLocalMap ret = slowThreadLocalMap.get();
    if (ret == null) {
        ret = new InternalThreadLocalMap();
        slowThreadLocalMap.set(ret);
    }
    return ret;
}

如果当前线程是 FastThreadLocalThread 类型,那么直接通过 fastGet() 方法获取 FastThreadLocalThread 的 threadLocalMap 属性即可。如果此时 InternalThreadLocalMap 不存在,直接创建一个返回。关于 InternalThreadLocalMap 的初始化在上文中已经介绍过,它会初始化一个长度为 32 的 Object 数组,数组中填充着 32 个缺省对象 UNSET 的引用。

接下来看一下初始化方法initialize

private V initialize(InternalThreadLocalMap threadLocalMap) {
    V v = null;
    try {
    	// 获取初始值
        v = initialValue();
    } catch (Exception e) {
        PlatformDependent.throwException(e);
    }
	// 保存到数组中,如果长度不够扩容
    threadLocalMap.setIndexedVariable(index, v);
    // 将 FastThreadLocal 对象保存到待清理的 Set 中
    addToVariablesToRemove(threadLocalMap, this);
    return v;
}
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
	// 获取数组下标为 0 的元素
    Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
    Set<FastThreadLocal<?>> variablesToRemove;
    if (v == InternalThreadLocalMap.UNSET || v == null) {
    	// 创建 FastThreadLocal 类型的 Set 集合
        variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
        // 将 Set 集合填充到数组下标 0 的位置
        threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
    } else {
   		 // 如果不是 UNSET,Set 集合已存在,直接强转获得 Set 集合
        variablesToRemove = (Set<FastThreadLocal<?>>) v;
    }
	// 将 FastThreadLocal 添加到 Set 集合中
    variablesToRemove.add(variable);
}

variablesToRemoveIndex 是采用 static final 修饰的变量,在 FastThreadLocal 初始化时 variablesToRemoveIndex 被赋值为 0。InternalThreadLocalMap 首先会找到数组下标为 0 的元素,如果该元素是缺省对象 UNSET 或者不存在,那么会创建一个 FastThreadLocal 类型的 Set 集合,然后把 Set 集合填充到数组下标 0 的位置。如果数组第一个元素不是缺省对象 UNSET,说明 Set 集合已经被填充,直接强转获得 Set 集合即可。这就解释了 InternalThreadLocalMap 的 value 数据为什么是从下标为 1 的位置开始存储了,因为 0 的位置已经被 Set 集合占用了。

为什么 InternalThreadLocalMap 要在数组下标为 0 的位置存放一个 FastThreadLocal 类型的 Set 集合呢?这时候我们回过头看下 remove() 方法。

public final void remove() {
    remove(InternalThreadLocalMap.getIfSet());
}

@SuppressWarnings("unchecked")
public final void remove(InternalThreadLocalMap threadLocalMap) {
    if (threadLocalMap == null) {
        return;
    }

	// 删除数组下标 index 位置对应的 value
    Object v = threadLocalMap.removeIndexedVariable(index);
    // 从数组下标 0 的位置取出 Set 集合,并删除当前 FastThreadLocal
    removeFromVariablesToRemove(threadLocalMap, this);
	
    if (v != InternalThreadLocalMap.UNSET) {
        try {
        	// 空方法,用户可以继承实现
            onRemoval((V) v);
        } catch (Exception e) {
            PlatformDependent.throwException(e);
        }
    }
}

在执行 remove 操作之前,会调用 InternalThreadLocalMap.getIfSet() 获取当前 InternalThreadLocalMap。找到 InternalThreadLocalMap 之后,InternalThreadLocalMap 会从数组中定位到下标 index 位置的元素,并将 index 位置的元素覆盖为缺省对象 UNSET。接下来就需要清理当前的 FastThreadLocal 对象,此时 Set 集合就派上了用场,InternalThreadLocalMap 会取出数组下标 0 位置的 Set 集合,然后删除当前 FastThreadLocal。

set方法

public final void set(V value) {
	// 1. value 是否为缺省值
    if (value != InternalThreadLocalMap.UNSET) {
    	// 2. 获取当前线程的 InternalThreadLocalMap
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        // 3. 将 InternalThreadLocalMap 中数据替换为新的 value
        setKnownNotUnset(threadLocalMap, value);
    } else {
        remove();
    }
}

FastThreadLocal.set() 方法虽然入口只有几行代码,但是内部逻辑是相当复杂的。我们首先还是抓住代码主干,一步步进行拆解分析。set() 的过程主要分为三步:

  • 判断 value 是否为缺省值,如果等于缺省值,那么直接调用 remove() 方法。这里我们还不知道缺省值和 remove() 之间的联系是什么,我们暂且把 remove() 放在最后分析。

  • 如果 value 不等于缺省值,接下来会获取当前线程的 InternalThreadLocalMap。

  • 然后将 InternalThreadLocalMap 中对应数据替换为新的 value。

Edgar

Edgar
一个略懂Java的小菜比