并发编程-ThreadLocal原理解析及内存泄露问题

ThreadLocal

基本介绍

什么是ThreadLocal?首先我们引用下ThreadLocal的源码注释

This class provides thread-local variables. These variables differ from their normal counterparts in that each thread that accesses one has its own, independently initialized copy of the variable.
ThreadLocal instances are typically private static fields in classes that wish to associate state with a thread (e.g.,a user ID or Transaction ID).
翻译过来就是:
此类提供线程局部变量。这些变量不同于其他的普通变量,因为每个访问变量的线程都有自己的、独立初始化的变量副本.
ThreadLocal实例通常是使用static和private修饰, 会用来存储用户id或者事务id

定义:
ThreadLocal是一个提供线程局部变量的工具类, 对于一个共享变量,每个线程都有自己独立的变量副本, 实现了线程间的数据隔离, 从而规避了线程数据安全问题

注意这里和synchronized的区别

ThreadLocal和Synchonized都用于解决多线程并发访问。但是ThreadLocal与synchronized有本质的区别。Synchronized用于线程间的数据共享,而ThreadLocal则用于线程间的数据隔离。

应用场景分析

其实很多框架的源码中都用到了ThreadLocal, 我举两个例子,大家可以去看下源码

1,ReentrantReadWriteLock里面的readHolds变量继承了ThreadLocal, 用来存放线程获取读锁的次数
2,Spring中的一些Bean(如RequestContextHolder、TransactionSynchronizationManager、LocaleContextHolder

基本上都是为了存储线程变量的上下文的信息,给大家举个最简单的例子

用户登录后,你需要在项目的任意地方能获取到用户信息,这时如果用ThreadLocal会怎么做?

  • 进入方法前存储用户信息 --> threadLocal.set(“user”)
  • 需要用到用户信息时 --> threadLocal.get()
  • 方法执行完毕后, 清空threadLocal —> threadLocal.remove()

基本介绍

什么是ThreadLocal?首先我们引用下ThreadLocal的源码注释

This class provides thread-local variables. These variables differ from their normal counterparts in that each thread that accesses one has its own, independently initialized copy of the variable.
ThreadLocal instances are typically private static fields in classes that wish to associate state with a thread (e.g.,a user ID or Transaction ID).
翻译过来就是:
此类提供线程局部变量。这些变量不同于其他的普通变量,因为每个访问变量的线程都有自己的、独立初始化的变量副本.
ThreadLocal实例通常是使用static和private修饰, 会用来存储用户id或者事务id

定义:
ThreadLocal是一个提供线程局部变量的工具类, 对于一个共享变量,每个线程都有自己独立的变量副本, 实现了线程间的数据隔离, 从而规避了线程数据安全问题

注意这里和synchronized的区别

ThreadLocal和Synchonized都用于解决多线程并发访问。但是ThreadLocal与synchronized有本质的区别。Synchronized用于线程间的数据共享,而ThreadLocal则用于线程间的数据隔离。

应用场景分析

其实很多框架的源码中都用到了ThreadLocal, 我举两个例子,大家可以去看下源码

1,ReentrantReadWriteLock里面的readHolds变量继承了ThreadLocal, 用来存放线程获取读锁的次数
2,Spring中的一些Bean(如RequestContextHolder、TransactionSynchronizationManager、LocaleContextHolder

基本上都是为了存储线程变量的上下文的信息,给大家举个最简单的例子

用户登录后,你需要在项目的任意地方能获取到用户信息,这时如果用ThreadLocal会怎么做?

  • 进入方法前存储用户信息 --> threadLocal.set(“user”)
  • 需要用到用户信息时 --> threadLocal.get()
  • 方法执行完毕后, 清空threadLocal —> threadLocal.remove()

源码解析

ThreadLocal的数据结构

并发编程-ThreadLocal原理解析及内存泄露问题

大家可以大概看一下ThreadLocal的数据结构, 可以看到其实每个Thread都会有一个ThreadLocalMap,
ThreadLocal其实就是操作线程的ThreadLocalMap
,ThreadLocalMap的key就是ThreadLocal,value就是需要存储的值.

下面带大家看下set方法的具体过程
大家可以带着这几个疑问

  • ThreadLocalMap的Hash算法是啥样的?
  • 出现了hash冲突是怎么解决的?
  • 扩容是怎么处理的?
  • 为什么要设计WeakReference(弱引用)?

下面根据源码一个个解决这些问题

首先看下set方法

threadLocal.set(T value)

    public void set(T value) {
        //得到当前的线程
        Thread t = Thread.currentThread();
        //得到线程中的ThreadLocal.ThreadLocalMap变量
        ThreadLocalMap map = getMap(t);
        if (map != null)
            //这里是把当前的threadLocal对象作为key放入thread的ThreadLocalMap中
            //注意这里的this就是threadLocal对象
            map.set(this, value);
        else
            //新建一个map,然后放入值
            createMap(t, value);
    }

具体流程:

  • 获取线程中的ThreadLocalMap,每个线程都会有一个ThreadLocalMap变量
  • 如果存在map,直接把当前的ThreadLocal对象和对应的值放入map中
  • 如果不存在就创建一个

createMap(t, value);

然后我们看下初始化map时createMap(t, value); 方法的内部的处理

    void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }
     ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
            //新建entry,初始容量为16
            table = new Entry[INITIAL_CAPACITY];
            //注意这里的hash算法,
            //firstKey.threadLocalHashCode其实是nextHashCode.getAndAdd(0x61c88647);
            //0x61c88647是一个黄金分割数,大大降低了hash冲突的概率
            //然后i就是当前key在散列表中对应的数组下标位置
            int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
            table[i] = new Entry(firstKey, firstValue);
            size = 1;
            //设置负载因子,这里是容量的2/3
            setThreshold(INITIAL_CAPACITY);
        }

从源码可以看到

  • 内部是一个Entry数组
  • ThreadLocalMap的hash算法, 这里的增量是一个黄金分割数,大大减低了hash冲突
  • 负载因子是2/3

map.set(this, value);

下面看下map.set(this, value); 的具体实现

private void set(ThreadLocal<?> key, Object value) {
            //获取之前创建的数组
            Entry[] tab = table;
            int len = tab.length;
            //这里的hash方法和之前的一样
            int i = key.threadLocalHashCode & (len-1);
            //注意这里的写法
            //1,首先找到数组中的i下标  Entry e = tab[i]
            //2,如果i下标是已经有元素了(注意如果这里e == null的话,就直接退出循环执行下一步了)
            //3,从i下标开始遍历数组,直到找到一个==null
            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                //当前i下标的元素是!=null的时候才会进来
                //获取的是当前key的值
                ThreadLocal<?> k = e.get();
                //判断如果是当前key
                if (k == key) {
                    //直接更新值,然后返回
                    e.value = value;
                    return;
                }
                //如果k==null
                //这里可能有同学会问为啥这里会为空?
                //因为ThreadLocal是弱引用,所以会被GC回收掉,但是value没有被回收
                //所以会导致这种情况,下面会详细解释下
                if (k == null) {
                    //清理无效元素并且设置值
                    replaceStaleEntry(key, value, i);
                    return;
                }
            }
            //不为null的时候就直接设置元素
            tab[i] = new Entry(key, value);
            int sz = ++size;
            //这里返回true说明有无效元素
            if (!cleanSomeSlots(i, sz) && sz >= threshold)
                //扩容
                rehash();
        }

我们来梳理下set方法的流程:

1,取hash值,找到数组中对应的下标i
2,如果i对应的元素不为空,说明发生hash冲突
3,从i开始往后遍历,判断后面的元素的key
4,如果key等于当前的key,直接更新值,然后返回
5,如果key==null说明一部分弱引用被GC回收了,产生了无效元素,这个时候会去清理无效元素并设置值
6,如果i对应的元素为空的话就可以直接设置值,并判断是否需要扩容,然后结束

从这个方法里面我们可以知道是怎么解决hash冲突的,然后我们看下是怎么设置value值的

replaceStaleEntry方法;

private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                                       int staleSlot) {
            Entry[] tab = table;
            int len = tab.length;
            Entry e;

            // Back up to check for prior stale entry in current run.
            // We clean out whole runs at a time to avoid continual
            // incremental rehashing due to garbage collector freeing
            // up refs in bunches (i.e., whenever the collector runs).
            int slotToExpunge = staleSlot;
            //从当前节点向前遍历,找到最前面一个无效的位置slotToExpunge
            for (int i = prevIndex(staleSlot, len);
                 //如果元素为空的话就退出了循环
                 (e = tab[i]) != null;
                 i = prevIndex(i, len))
                if (e.get() == null)
                    slotToExpunge = i;

            // Find either the key or trailing null slot of run, whichever
            // occurs first
            //从当前元素向后遍历
            for (int i = nextIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = nextIndex(i, len)) {
                ThreadLocal<?> k = e.get();

                // If we find key, then we need to swap it
                // with the stale entry to maintain hash table order.
                // The newly stale slot, or any other stale slot
                // encountered above it, can then be sent to expungeStaleEntry
                // to remove or rehash all of the other entries in run.
                if (k == key) {
                    //找到了key后,先给e设置value值
                    e.value = value;
                    //然后把无效的元素和当前元素交换位置
                    tab[i] = tab[staleSlot];
                    tab[staleSlot] = e;

                    // Start expunge at preceding stale entry if it exists
                    //如果slotToExpunge == staleSlot说明,之前向前遍历的过程中没有找到前面的无效位置
                    //那就就从当前位置开始清理
                    if (slotToExpunge == staleSlot)
                        slotToExpunge = i;
                    //当expungeStaleEntry方法碰到了空的entry后,返回了空的entry的i, 这里的len是长度
                    //继续向后清理
                    cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                    return;
                }

                // If we didn't find stale entry on backward scan, the
                // first stale entry seen while scanning for key is the
                // first still present in the run.
                //如果当前的元素已经无效,并且向前扫描过程中没有无效元素,则更新slotToExpunge为当前位置
                if (k == null && slotToExpunge == staleSlot)
                    slotToExpunge = i;
            }

            // If key not found, put new entry in stale slot
            //如果key在table中不存在,则在原地放一个即可
            tab[staleSlot].value = null;
            tab[staleSlot] = new Entry(key, value);

            // If there are any other stale entries in run, expunge them
            //发现任何无效元素,则做一次清理
            if (slotToExpunge != staleSlot)
                cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
        }

流程梳理:

1,从当前节点向前遍历,找到最前面一个无效的位置并记录到slotToExpunge
2,从当前元素向后遍历,因为之前是碰到了key==null的元素才会走到这里,所以这个遍历要尝试继续寻找key相等的元素
3,如果找到了key相等的元素,更新值,然后把之前找到的无效的元素和当前元素交换位置,然后从之前找到的最前面的无效位置开始往后清理
4,如果没有找到key相等的元素,直接在当前无效元素的位置设置key和value
5,根据slotToExpunge判断,是否需要再次清理

现在我们知道了当找到key==null的元素后,是怎么设置value值的
那么到底是怎么清理元素的呢,我们来看下

cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);

首先看下expungeStaleEntry

//从当前位置开始往后遍历清理,将无效对象清理,将指向entry的table[i]置为null,直到扫到空entry
        private int expungeStaleEntry(int staleSlot) {
            Entry[] tab = table;
            int len = tab.length;
            //首选把当前位置清理掉
            tab[staleSlot].value = null;
            tab[staleSlot] = null;
            size--;

            Entry e;
            int i;
            //然后往后遍历
            for (i = nextIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = nextIndex(i, len)) {
                ThreadLocal<?> k = e.get();
                //发现无效的都清理掉
                if (k == null) {
                    e.value = null;
                    tab[i] = null;
                    size--;
                    //rehash,重新分配下标
                } else {
                    int h = k.threadLocalHashCode & (len - 1);
                    if (h != i) {
                        tab[i] = null;

                        // Unlike Knuth 6.4 Algorithm R, we must scan until
                        // null because multiple entries could have been stale.
                        while (tab[h] != null)
                            h = nextIndex(h, len);
                        tab[h] = e;
                    }
                }
            }
            //这里的i是当e = tab[i]) == null的时候的i
            return i;
        }

流程梳理

1,首先把当前无效位置的值给清理掉,然后size–
2,开始往后遍历,发现无效的都清理掉
3,有效的元素重新分配下标
4,当tab[i] == null时,结束遍历并返回i

这块的逻辑其实比较清晰简洁,注意这里返回的条件是tab[i] == null, 这个是一个线性探测,这时候其实i后面的元素可能也会存在无效元素,于是继续清理,看下

cleanSomeSlots方法

		//从i+1节点继续开始扫描清理无效元素
        //注意i是之前返回了下标,是不可能是无效元素的,所以从i+1开始
        //这里的n是数组长度,代表清理次数log n
        private boolean cleanSomeSlots(int i, int n) {
            boolean removed = false;
            Entry[] tab = table;
            int len = tab.length;
            do {
                i = nextIndex(i, len);
                Entry e = tab[i];
                if (e != null && e.get() == null) {
                    n = len;
                    removed = true;
                    i = expungeStaleEntry(i);
                }
            } while ( (n >>>= 1) != 0);
            return removed;
        }

流程梳理:

1,找到第i+1个元素,判断它是否是无效元素
2,如果是,调用上面的expungeStaleEntry方法来清理无效元素(只能清理到下一个元素为null之前)
3,如果不是则继续遍历,注意这里只会遍历log n 次

这个被称为一次启发式清理的流程
这里的启发式清理和之前的线性探测清理能清理掉大部分的无效元素

最后看看扩容的流程

rehash

 private void rehash() {
            //先全量清理一遍无效元素(全量)
            expungeStaleEntries();
            // Use lower threshold for doubling to avoid hysteresis
            //超过负载就扩容
            //因为做了一次清理,所以size很可能会变小。
            //ThreadLocalMap这里的实现是调低阈值来判断是否需要扩容,
            //threshold默认为len*2/3,所以这里的threshold - threshold / 4相当于len/2
            if (size >= threshold - threshold / 4)
                resize();
        }

扩容前会先全量清理一遍无效的数据,然后开始扩容,其实这里的resize的扩容流程其实和普通数组的扩容差不多

因为get的流程和remove的代码都比较简单,这里就不赘述了 大家有兴趣的话就自行查阅~

为啥可能会出现内存泄露及解决方案?

看下Entry的结构

static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }

可以发现这里的ThreadLocal是一个弱引用, 什么时候这个弱引用会被清理呢?
没有外部强引用来引用这个ThreadLocal实例,这时发生GC的话,就会被清理掉,
给大家举个最简单的内存泄露的例子

 public static void main(String[] args) {
        //强引用
        ThreadLocal<String> local = new ThreadLocal<>();
        local.set("aaa");
        //释放了强引用
        local = null;
        //这时候的GC发现ThreadLocal的强引用local不在了,就会去清理掉ThreadLocal的弱引用
        System.gc();
        //此时如果线程还在运行(例如线程池中的线程)的话,就发生了内存泄露
    }

其实我们正常使用ThreadLocal时, 其实是较少会有内存泄露的问题的, 只是会有这种可能,
现在大家外面谈到ThreadLocal都是谈虎色变,以讹传讹,其实大可不必,不过这是还是建议大家搭配remove方法使用,有备无患~
大家只有知道ThreadLoca内存泄露的原因,才能更好的使用它.

这里有同学会问, 弱引用这么麻烦, 为啥还要这么设计?这里的key直接用强引用不行吗?

这里我们分两种情况讨论:

key 使用强引用:上例的local ==null后, local断开了对ThreadLocal实例的强引用 ,但是ThreadLocalMap还持有ThreadLocal的强引用,如果没有手动删除,ThreadLocal不会被回收,导致Entry内存泄漏。
key 使用弱引用:上例的local ==null后, local断开了对ThreadLocal实例的强引用 ,由于ThreadLocalMap持有ThreadLocal的弱引用,即使没有手动删除,ThreadLocal也会被回收。value在下一次ThreadLocalMap调用set,get,remove的时候会被清除。

比较两种情况,我们可以发现:由于ThreadLocalMap的生命周期跟Thread一样长,如果都没有手动删除对应key,都会导致内存泄漏,但是使用弱引用可以多一层保障

因此,ThreadLocal内存泄漏的根源是:由于ThreadLocalMap的生命周期跟Thread一样长,如果没有手动删除对应key就会导致内存泄漏,而不是因为弱引用。

总结

结论都在上面了, 有啥问题或者意见欢迎大家指出,一起进步,共勉~

上一篇:不用js,html+css也能轻松实现tab选项卡


下一篇:SQL基础(二)