上文 :秒级达百万高并发框架-Disruptor


TransmitterableThreadLocal介绍

    TransmitterableThreadLocal简称TTL 是阿里巴巴开源的一个框架。TransmittableThreadLocal是对Java中的ThreadLocal进行了增强和扩展。它旨在解决在线程池或异步任务调用链中,ThreadLocal无法正确传递值的问题。

TransmitterableThreadLocal源码:https://github.com/alibaba/transmittable-thread-local

754a9076972d6129c7fc57468b2aaacb.png

TransmitterableThreadLocal解决了什么问题?

正常情况下,当使用线程池或异步任务时,原始的ThreadLocal在线程切换或任务执行时无法将设置的值正确传递给子线程或后续任务。这使得在使用线程池或异步编程时,ThreadLocal的使用变得非常复杂。TransmittableThreadLocal通过在线程切换时显式地传递ThreadLocal的值,解决了上述问题。它提供了一种机制,可以自动将ThreadLocal的值从父线程传递到子线程,并确保在整个任务调用链中正确传递。

使用TransmittableThreadLocal时,您可以像使用普通的ThreadLocal一样设置和获取值,但它会自动处理值的传递。这样,即使在线程池或异步任务中,也能够正确地共享和传递ThreadLocal的值。

个人理解:其实就是原有线程池的一个加强版,解决上一个线程带给下一线程一些值传递问题。

TransmitterableThreadLocal的使用

引入jar包

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>transmittable-thread-local</artifactId>
    <version>2.14.2</version>
</dependency>

验证demo代码

demo1

package com.ttl;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.alibaba.ttl.TransmittableThreadLocal;

public class MyRunnable implements Runnable {
    private static TransmittableThreadLocal<Integer> threadLocal = new TransmittableThreadLocal<>();

    @Override
    public void run() {
        threadLocal.set((int) (Math.random() * 100)); // 设置当前线程的值
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("TransmittableThreadLocal value: " + threadLocal.get()); // 获取当前线程的值
    }
}
package com.ttl;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author: csh
 * @Date: 2023/7/8 20:37
 * @Description:
 * ttl案例,通过 线程池,将上一个线程带给下一线程。
 *
 */
public class StudyTtl {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        MyRunnable sharedRunnable = new MyRunnable();

        executorService.execute(sharedRunnable);
        executorService.execute(sharedRunnable);

        executorService.shutdown();
    }
}

结果

TransmittableThreadLocal value: 99
TransmittableThreadLocal value: 25

demo2

package com.ttl;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.alibaba.ttl.TransmittableThreadLocal;

public class TransmittableThreadLocalDemo {
    public static void main(String[] args) {
        // 创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 创建TransmittableThreadLocal实例
        TransmittableThreadLocal<Integer> threadLocal = new TransmittableThreadLocal<>();

        // 在主线程设置值
        threadLocal.set(42);
        System.out.println("主线程 - ThreadLocal value: " + threadLocal.get());

        // 提交任务到线程池
        executorService.execute(() -> {
            // 打印子线程中的ThreadLocal值
            System.out.println("子线程 - ThreadLocal value: " + threadLocal.get());
        });

        // 关闭线程池
        executorService.shutdown();
    }
}

结果

主线程 - ThreadLocal value: 42
子线程 - ThreadLocal value: 42

TransmitterableThreadLocal的实现原理

TransmittableThreadLocal的实现原理主要依赖于Java的InheritableThreadLocal和ThreadLocal。

在Java中,InheritableThreadLocal是一个可以在父线程和子线程之间传递值的类。它通过子线程继承父线程的值,并且可以在子线程中对该值进行修改。然而,InheritableThreadLocal并不能满足在线程池或异步任务场景下的需求,因为它仅在线程创建时继承值,而在线程切换 or 任务执行后无法正确地传递值。

TransmittableThreadLocal通过扩展InheritableThreadLocal类,并重写其相关方法,实现了对ThreadLocal值传递的增强。

具体来说,TransmittableThreadLocal的实现原理如下:

在TransmittableThreadLocal中,每个ThreadLocal对象都有一个对应的Holder对象(比如TransmitterableThreadLocal.TransmitterableThreadLocalHolder),用于保存值的传递。

当主线程设置ThreadLocal值时,TransmittableThreadLocal会将值存储在Holder对象中。

当创建子线程时,TransmittableThreadLocal会使用InheritableThreadLocal的特性,将父线程中的Holder对象复制到子线程中。

在子线程中,当通过TransmittableThreadLocal获取值时,它会先检查当前线程是否有Holder对象。如果没有,则会从父线程中获取Holder对象,并拷贝一份到子线程中,以确保值的正确传递。

当父线程的ThreadLocal值发生改变时,TransmittableThreadLocal还会同步更新子线程的对应值,以保持值的一致性。

总结起来,TransmittableThreadLocal的实现机制主要通过继承InheritableThreadLocal类,重写相关方法,并使用Holder对象进行值的传递和同步更新。这样就能够在线程切换或异步任务中正确地传递ThreadLocal的值。

TransmitterableThreadLocal核心源码学习

类图

869f087c32eb93bcd3c836e9eb833d3a.png

com.alibaba.ttl.TransmittableThreadLocal实现如下

public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> {
    private transient ConcurrentMap<Thread, T> threadLocalValues = new ConcurrentHashMap<>();
    private transient ConcurrentMap<Thread, ThreadLocalMap> inheritableThreadLocals = new ConcurrentHashMap<>();

    @Override
    public void set(T value) {
        Thread currentThread = Thread.currentThread();

        // 检查当前线程是否存在 ThreadLocalMap 对象,如果不存在,则创建
        ThreadLocalMap threadLocals = inheritableThreadLocals.get(currentThread);
        if (threadLocals == null) {
            threadLocals = new ThreadLocalMap();
            inheritableThreadLocals.put(currentThread, threadLocals);
        }

        // 设置当前线程的值
        threadLocals.set(this, value);

        // 存储当前线程的值
        threadLocalValues.put(currentThread, value);
    }
    //初始化值方法
    @Override
    protected T initialValue() {
        return null;
    }
    //获取父类的值
    @Override
    protected T childValue(T parentValue) {
        return parentValue;
    }
    //获取方法
    @Override
    public T get() {
        Thread currentThread = Thread.currentThread();

        // 首先尝试从 threadLocalValues 获取当前线程的值
        T value = threadLocalValues.get(currentThread);
        if (value == null && !threadLocalValues.containsKey(currentThread)) {

            // 如果 threadLocalValues 中不存在当前线程的值,则尝试从 inheritableThreadLocals 获取
            ThreadLocalMap threadLocals = inheritableThreadLocals.get(currentThread);
            if (threadLocals != null) {
                value = threadLocals.get(this);
            }

            // 如果还是没有获取到值,则调用 initialValue() 方法初始化
            if (value == null) {
                value = initialValue();

                // 将值存储到 threadLocalValues 和 inheritableThreadLocals 中
                threadLocalValues.put(currentThread, value);
                if (threadLocals == null) {
                    threadLocals = new ThreadLocalMap();
                    inheritableThreadLocals.put(currentThread, threadLocals);
                }
                threadLocals.set(this, value);
            }
        }
        return value;
    }
    //删除当前线程的内容
    @Override
    public void remove() {
        Thread currentThread = Thread.currentThread();
        threadLocalValues.remove(currentThread);

        ThreadLocalMap threadLocals = inheritableThreadLocals.get(currentThread);
        if (threadLocals != null) {
            threadLocals.remove(this);
        }
    }
}

其它源码可自行了解~

最后

    ttl这个框一般是用于链路跟踪技术的场景,可以将当前线程生成的一个taskid传递到下一个线程,然后由下一个线程来处者或者一些线上特殊的业务场景需要前后或多个线程依次传递的一个场景,但是建议还是通过新增字段或统一缓存比如redis这种来做这种操作,因为如果仅仅是一个线程内的,很有可能因为机器需要重启或一些未考虑到的操作,导致该操作的值丢失,而且这种耦合到你的业务代码中不太合理,特别是关于业务代码这种,如果实在没有必要或有更好的方案建议还是要考虑场景是否合适,本人就因为看过太多同学为了炫耀技术,导致项目代码维护及发现一些很难排查的线上事故~~~

参考文献:

https://segmentfault.com/a/1190000041954190?utm_source=sf-similar-article

https://github.com/alibaba/transmittable-thread-local

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐