线程模型精讲 · 上下文传播三层机制 · 背压策略设计 · 多租户隔离 · 两次错误修复的完整复盘


引言:一个看起来正确的系统,为什么会持续出错

这篇文章来自一次真实的生产排查经历。

系统上线后陆续暴露出几个问题:日志里 traceId 神秘消失、高并发时 Tomcat 接口响应变慢、某个用户的审核任务结果找不到记录——每个问题看起来彼此独立,排查起来毫无头绪。

更棘手的是:针对第三个问题的两次修复,都引发了新的 Bug。第一次修复只解决了表面现象;第二次修复逻辑上看起来完全正确,却依然在高并发下失效——排查到最后,才意识到根因藏在 Java 线程模型最底层的一个鲜为人知的细节里。

这五个问题,每一个单独拿出来都是常见题,但放在异步编程场景下,它们相互叠加,成为了系统性的「异步地雷」。

本文以这次经历为蓝本(代码已脱敏为伪代码),完整还原从现象发现 → 根因定位 → 解决方案 → 修复失败 → 再次定位 → 正确修复的全过程,并在最后提炼出可复用的架构原则和诊断方法论。


第〇章:系统架构与线程模型

0.1 业务场景

系统是一个「文档智能分析平台」。用户上传一份文档,平台从多个维度分析文档内容,每个维度由独立的 AI Agent 负责。

核心约束:单次 AI 调用耗时 120s ~ 600s

这个约束决定了系统必须采用全异步架构:

  • 用户请求立即返回(异步处理,不等 AI 结果)

  • 后台并发调用多个 AI Agent

  • 全部完成后合并结果写入数据库

系统骨架伪代码:

// ===== 入口层:Tomcat 线程 =====
public ReviewResp createReview(ReviewReq req) {
    Long recordId = saveRecord(req);                           // 1. DB 保存任务记录
    Long tenantId = SecurityFrameworkUtils.getTenantId();      // 2. 从安全上下文获取租户
    submitCoordinatorTask(                                     // 3. 异步提交,立即返回
        () -> executeReviewTask(recordId, tenantId));
    return buildResp(taskId);
}
​
// ===== 协调层:coordinatorPool 线程池 =====
void executeReviewTask(Long recordId, Long tenantId) {
    String content = extractContent(fileUrl);  // 同步 IO
​
    CompletableFuture<String> f1 = callAiAsync(RULE,    content, tenantId);
    CompletableFuture<String> f2 = callAiAsync(CONTENT, content, tenantId);
    CompletableFuture<String> f3 = callAiAsync(FORMAT,  content, tenantId);
    CompletableFuture<String> f4 = callAiAsync(DYNAMIC, content, tenantId);
​
    CompletableFuture.allOf(f1, f2, f3, f4)
        .whenCompleteAsync((v, ex) -> mergeAndSave(recordId, f1, f2, f3, f4),
            coordinatorPool);
}
​
// ===== AI 调用层:aiCallPool 线程池(每次 120s~600s)=====
CompletableFuture<String> callAiAsync(AgentType type, String content, Long tenantId) {
    return CompletableFuture.supplyAsync(
        wrapSupplier(() -> httpCallAiService(type, content)),
        aiCallPool);
}

线程池初始配置(问题发生时):

ThreadPoolExecutor coordinatorPool = new ThreadPoolExecutor(
    5, 10, 60L, SECONDS,
    new LinkedBlockingQueue<>(200),
    new CallerRunsPolicy()    // ← 埋下隐患
);
​
ThreadPoolExecutor aiCallPool = new ThreadPoolExecutor(
    5, 15, 60L, SECONDS,
    new LinkedBlockingQueue<>(200),
    new CallerRunsPolicy()    // ← 同样埋下隐患
);

0.2 三层线程模型与任务流转

单个请求的完整线程流转:


AI服务aiCallPoolcoordinatorPoolTomcat线程AI服务aiCallPoolcoordinatorPoolTomcat线程同步阶段立即返回,Tomcat线程释放协调阶段Future链构建完毕,协调线程释放AI调用阶段(120s~600s/次)合并阶段saveRecord 保存任务记录runAsync(wrapRunnable(task))extractContent 同步IO提取文本supplyAsync(RULE)supplyAsync(CONTENT)supplyAsync(FORMAT)supplyAsync(DYNAMIC)并发HTTP调用各Agent陆续返回allOf完成,whenCompleteAsync触发mergeAndSave 合并写库

这个模型看起来清晰合理,实际上埋了五个地雷。


第一章:陷阱一——日志 traceId 全面丢失(可观测性危机)

1.1 现象还原

接入链路追踪后,观察一次完整审查请求的日志:

[traceId=abc123] 保存审查记录成功,recordId=1001
[traceId=abc123] 开始提取文档内容,recordId=1001
[           ] 开始调用 RULE Agent,recordId=1001      ← traceId 消失!
[           ] 开始调用 CONTENT Agent,recordId=1001   ← traceId 消失!
[           ] mergeAndSave 合并结果,recordId=1001    ← traceId 消失!

一旦任务跨越线程边界,traceId 全部丢失。没有 traceId,整条链路无法串联,排查问题只能靠猜。

1.2 MDC 的本质:ThreadLocal 的别名

理解丢失原因,必须先搞清楚 MDC 是什么。

// MDC 底层等价伪代码
class MDC {
    private static final ThreadLocal<Map<String, String>> ctx = new ThreadLocal<>();
​
    public static void put(String key, String val) {
        getOrCreate().put(key, val);  // 仅对当前线程有效
    }
​
    public static String get(String key) {
        Map<String, String> map = ctx.get();
        return map == null ? null : map.get(key);  // 其他线程取到的是 null
    }
}

MDC 的三个核心特性

  1. 线程绑定:只对当前线程可见,跨线程提交后目标线程的 MDC 是独立的,默认为空

  2. 线程池复用时会残留:线程执行完任务后,MDC 若未 clear(),下次复用时仍持有上次数据

  3. CompletableFuture 无 executor 的回调继承完成线程的 MDCthenApply/whenComplete 在完成前一阶段的同一线程上执行

1.3 三个丢失点精准定位

丢失点 1:wrapSupplierfinally 中执行了 MDC.clear()
static <T> Supplier<T> wrapSupplier(Supplier<T> supplier) {
    Map<String, String> mdcSnapshot = MDC.getCopyOfContextMap();
    return () -> {
        try {
            MDC.setContextMap(mdcSnapshot);   // 注入 MDC
            return supplier.get();            // AI 调用(阻塞 120s~600s)
        } finally {
            MDC.clear();   // ← 问题根源:清掉了同线程后续回调的 MDC!
        }
    };
}

MDC.clear() 清空了当前线程的 MDC。AI 调用完毕进入 finally 后,该线程的 MDC 被清空。但此时 Future 上还挂着无 executor 的 whenComplete 回调,它在同一线程上立即执行——此时 MDC 已经是空的。


日志系统aiCallPool线程日志系统aiCallPool线程wrapSupplier 注入 MDC traceId=abc123MDC 已空traceId 字段为空httpCallAiService() 执行 120s+finally MDC.clear()whenComplete 回调触发(同一线程)log.info("RULE Agent 返回")

丢失点 2:部分子任务直接 supplyAsync 裸提交,未经 MDC 包装
// 某次迭代新增子任务,忘记用 wrapSupplier 包装
CompletableFuture.supplyAsync(
    () -> runSubTask(params),  // ← 执行线程无 MDC
    coordinatorPool
);
丢失点 3:whenCompleteAsync 的提交者线程 MDC 不确定

coordinatorPoolaiCallPool线程2 (CONTENT 最后完成)aiCallPool线程1 (RULE)coordinatorPoolaiCallPool线程2 (CONTENT 最后完成)aiCallPool线程1 (RULE)最后一个完成,MDC 已空从 A2 捕获的 MDC = 空mergeAndSave 日志无 traceIdRULE完成,finally MDC.clear()CONTENT完成,finally MDC.clear()提交 whenCompleteAsync 回调

allOf 由最后完成的子 Future 触发,谁最后完成谁就是提交者。若提交者线程的 MDC 已被清空,coordinatorPool 执行合并回调时同样没有 traceId。

1.4 wrapRunnablewrapSupplier 的不对称:代码演化遗漏

检查工具类时发现一个典型的代码演化遗漏问题:

// wrapRunnable(Runnable 版):使用了 TtlRunnable,TTL 上下文正确传播
static Runnable wrapRunnable(Runnable task) {
    Map<String, String> mdc = MDC.getCopyOfContextMap();
    return TtlRunnable.get(() -> {    // ← 自动捕获/还原所有 TransmittableThreadLocal
        MDC.setContextMap(mdc);
        task.run();
    });
}
​
// wrapSupplier(Supplier 版):只有普通 Lambda,TTL 上下文全部丢失
static <T> Supplier<T> wrapSupplier(Supplier<T> supplier) {
    Map<String, String> mdc = MDC.getCopyOfContextMap();
    return () -> {                    // ← 普通 Lambda,无 TTL 传播
        MDC.setContextMap(mdc);
        return supplier.get();
        // finally: MDC.clear()
    };
}

TtlRunnable 是阿里开源的 transmittable-thread-local 库提供的工具类,在提交时自动捕获所有 TransmittableThreadLocal 变量(多租户上下文、Spring Security Context、数据权限上下文等)的快照,执行时自动还原。

wrapRunnable 某次迭代中加入了 TTL 支持,wrapSupplier 忘记同步修改,导致两者能力不对称。

1.5 各类上下文传播机制对比

机制 同线程继承 跨线程传播 线程池复用安全 典型用途
普通 ThreadLocal 否(残留污染) 局部状态
MDC(set+clear) 是(clear 前) 是(显式 clear) 日志追踪
InheritableThreadLocal 是(仅 new Thread) 否(线程池复用无效) 父子线程传递
TransmittableThreadLocal(TTL) 是(需 TtlRunnable) 多租户/安全上下文
MDC 装饰器 Executor 是(提交时捕获) 统一 MDC 传播

第二章:陷阱二——CallerRunsPolicy 让整条调用链级联崩溃

2.1 为什么 catch(RejectedExecutionException) 是死代码

static CompletableFuture<Void> submitCoordinatorTask(Runnable task) {
    try {
        return CompletableFuture.runAsync(wrapRunnable(task), coordinatorPool);
    } catch (RejectedExecutionException e) {
        // ← 开发者认为:队列满了会被拒绝,catch 里做降级
        log.error("协调任务提交失败,系统过载", e);
        return CompletableFuture.failedFuture(e);
    }
}

这段代码看起来做了完整的拒绝处理,实际上 catch 块永远不会执行

原因在于 coordinatorPool 使用的是 CallerRunsPolicy

// JDK 源码
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();  // ← 直接在调用线程上执行!不抛任何异常!
        }
    }
}

CallerRunsPolicy 队列满时不抛异常,而是直接在 submit() 的调用线程上运行任务。所以 catch 块是一段典型的死代码——开发者误以为有保护,实际上没有。

2.2 级联阻塞时序

AI 调用耗时 120s~600s 的场景下,CallerRunsPolicy 会引发严重的线程污染级联反应


AI服务aiCallPoolcoordinatorPoolTomcat线程AI服务aiCallPoolcoordinatorPoolTomcat线程正常场景:高并发请求进来10个协调线程全部启动,各自提交AI调用max=15线程全满,阻塞300s+CallerRunsPolicy触发:新请求进来协调池有空位,正常接受队列也满!CallerRunsPolicy触发!协调线程直接执行httpCallAiService()阻塞300s!协调池也满了队列满!CallerRunsPolicy触发!Tomcat线程直接执行executeReviewTask()含同步IO,阻塞300s+HTTP接口响应开始变慢!submitCoordinatorTask(review-1)submitCoordinatorTask(review-2)callAiAsync x20submitCoordinatorTask(review-3)callAiAsync(review-3)submitCoordinatorTask(review-N)

级联路径清晰可见:

aiCallPool 满 + AI 调用耗时 300s
  → CallerRunsPolicy:coordinatorPool 线程被迫执行 HTTP 调用,阻塞 300s
    → coordinatorPool 所有线程占满
      → CallerRunsPolicy:Tomcat 线程被迫执行 executeReviewTask()
        → Tomcat 线程阻塞,HTTP 接口响应变慢,影响全站

2.3 拒绝策略选型

策略 行为 适用场景 风险
CallerRunsPolicy 调用线程直接执行 批处理、调用方可阻塞 高延迟场景下拖垮整条调用链
AbortPolicy(默认) RejectedExecutionException 调用方有明确降级逻辑 任务丢失
DiscardPolicy 静默丢弃 允许丢失的统计类任务 任务丢失且无感知
BlockThenRejectPolicy(自定义) 阻塞等待入队,超时才真正拒绝 Web 服务后台任务 推荐

修复后,catch (RejectedExecutionException) 真正生效:

class BlockThenRejectPolicy implements RejectedExecutionHandler {
    private final long timeout;
    private final TimeUnit unit;
​
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (executor.isShutdown()) throw new RejectedExecutionException("线程池已关闭");
        try {
            boolean enqueued = executor.getQueue().offer(r, timeout, unit);
            if (!enqueued) throw new RejectedExecutionException("等待入队超时,系统过载");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RejectedExecutionException("等待入队被中断", e);
        }
    }
}

第三章:陷阱三——不知道回调在哪个线程执行

3.1 CompletableFuture 回调线程的唯一规则

没有 executor 参数 → 在完成前一阶段的线程上就地执行 *Async + executor → 提交到指定 executor 执行

callAiAsync(RULE, content)
// ↑ aiCallPool 线程执行(阻塞 120s~600s)
​
    .whenComplete((r, ex) -> log.info("RULE 返回"))
// ↑ 同一个 aiCallPool 线程(无 executor → 就地继承)
​
callAiAsync(DYNAMIC, content)
    .thenApply(raw -> parseResult(raw))
// ↑ 同一个 aiCallPool 线程(无 executor → 就地继承)
​
    .thenComposeAsync(ids -> callNextAiAsync(ids), coordinatorPool)
// ↑ 提交到 coordinatorPool,在协调线程上执行
​
CompletableFuture.allOf(f1, f2, f3, f4)
    .whenCompleteAsync((v, ex) -> mergeAndSave(...), coordinatorPool)
// ↑ 由「最后完成的子 Future 所在线程」提交到 coordinatorPool

3.2 回调线程模型全景


无executor(whenComplete)

有executor(whenCompleteAsync)

无executor(thenApply)

有executor(thenComposeAsync)

callAiAsync(RULE)

aiCallPool线程
httpCallAiService()
阻塞120s~600s

同一aiCallPool线程
执行日志回调

coordinatorPool线程
执行mergeAndSave

callAiAsync(DYNAMIC)

aiCallPool线程
httpCallAiService()

同一aiCallPool线程
parseResult

coordinatorPool线程
nextStageAsync

3.3 自引用线程池的死锁误区

很多人看到协调线程池提交任务给自己会担心死锁:

// 协调线程上,把子任务也提交到协调线程池
CompletableFuture.supplyAsync(() -> runSubTask(params), coordinatorPool);
allOf(...).whenCompleteAsync(callback, coordinatorPool);

实际上不会死锁。原因:

executeReviewTask() 构建完所有 Future 链后立即返回
→ 协调线程释放,回到线程池等待
→ AI 调用结果到来时,whenCompleteAsync 提交的任务
  能获取到空闲的协调线程来执行
→ 不存在「线程 A 持有锁等线程 B,线程 B 等线程 A」的循环等待

但自引用在高负载下会带来性能退化:协调线程池接近满载时,自引用提交的子任务需要排队,增加整体延迟。

3.4 「提交者线程」是上下文传播的关键隐患

对于 *Async(executor) 形式,执行线程的上下文由提交者线程决定,而不是由 executor 本身决定:

提交者线程有正确 MDC → 可通过 Executor 装饰器传播
提交者线程 MDC 为空 → 装饰器捕获到空 context,执行线程也无 MDC

allOf(...).whenCompleteAsync(callback, pool) 的提交者是「最后完成的那个子 Future 的线程」,这个线程是运行时才能确定的不确定值。这是第五章陷阱的伏笔。


第四章:陷阱四——一个 Agent 失败,线程和等待时间全部浪费

4.1 allOf 的失败语义

// f2 在 200s 时异常
CompletableFuture<String> f1 = callAiAsync(RULE,    content); // 预计 400s
CompletableFuture<String> f2 = callAiAsync(CONTENT, content); // 200s 时异常
CompletableFuture<String> f3 = callAiAsync(FORMAT,  content); // 预计 350s
​
CompletableFuture.allOf(f1, f2, f3).whenCompleteAsync((v, ex) -> {
    if (ex != null) {
        updateStatus(recordId, FAILED);
        // f2 失败,整体标记失败,但:
        // f1 还需要 200s 才自然结束 → aiCallPool 线程被白白占用
        // f3 还需要 150s 才自然结束 → aiCallPool 线程被白白占用
        return;
    }
    mergeResults(f1, f2, f3);
}, coordinatorPool);

两个问题:

  1. 结果浪费:f1 和 f3 的分析结果无法使用

  2. 线程浪费:f1 和 f3 的 aiCallPool 线程无法服务新请求

4.2 cancel(false) 的本质与局限

CompletableFuture.cancel() 和大多数人理解的不同:

// CompletableFuture.cancel() 源码精简
public boolean cancel(boolean mayInterruptIfRunning) {
    // mayInterruptIfRunning 参数被忽略!
    // 没有底层线程引用,无法发送 interrupt 信号
    return internalComplete(new AltResult(new CancellationException()));
    // 只是把 Future 状态设置为 CancellationException
    // 底层的 httpCallAiService() 仍在运行!
}
方面 FutureTask.cancel(true) CompletableFuture.cancel(any)
中断底层线程 是(发送 interrupt) 否(无底层线程引用)
Future 状态变化 CancellationException CancellationException
下游 thenApply 不再执行 不再执行
底层 HTTP 连接 依赖 HTTP 客户端响应 interrupt 不中断

cancel(false)实际价值:立即将 Future 状态标记为失败,阻止下游回调继续执行,但无法中断底层 HTTP 线程。

4.3 Fail-Fast + AtomicBoolean CAS 防重

List<CompletableFuture<?>> allFutures = Arrays.asList(f1, f2, f3, f4);
AtomicBoolean failed = new AtomicBoolean(false);
​
for (CompletableFuture<?> future : allFutures) {
    future.whenComplete((result, ex) -> {
        if (ex != null
                && !(ex instanceof CancellationException)
                && failed.compareAndSet(false, true)) {  // CAS:只有第一个失败者执行
​
            log.error("Agent 调用失败,触发 Fail-Fast,recordId={}", recordId, ex);
            updateStatus(recordId, FAILED);   // 立即通知用户
​
            allFutures.forEach(f -> {
                if (!f.isDone()) f.cancel(false);  // 阻止后续回调
            });
        }
    });
}

AI服务aiCallPool线程3 FORMATaiCallPool线程2 CONTENTaiCallPool线程1 RULEcoordinatorPoolAI服务aiCallPool线程3 FORMATaiCallPool线程2 CONTENTaiCallPool线程1 RULEcoordinatorPoolFuture链构建完,协调线程释放whenComplete触发compareAndSet(false,true)=true 首次失败HTTP线程不受影响,仍在等待AI响应allOf完成,whenCompleteAsync触发failed.get()=true,跳过合并直接returncallAiAsync(RULE)callAiAsync(CONTENT)callAiAsync(FORMAT)HTTP RULE调用(预计400s)HTTP CONTENT调用HTTP FORMAT调用(预计350s)200s后返回异常updateStatus(FAILED) 立即写库f1.cancel(false) 状态变CancellationExceptionf3.cancel(false) 状态变CancellationException400s后返回(结果被丢弃,无后续回调)350s后返回(同上)

Fail-Fast 的核心价值:将用户感知失败的时间,从「最慢 Agent 响应时间(600s)」缩短到「第一个失败 Agent 响应时间(200s)」。

compareAndSet 的作用:当多个 Agent 几乎同时失败时,保证只有第一个 CAS 成功的线程执行 updateStatus,避免重复写库。


第五章:陷阱五——两次错误修复引发的多租户串扰(最难排查)

前四个陷阱修复上线后,系统运行一段时间,开始出现一个新的、更诡异的问题。

5.1 现象:记录时存时不存在

线上告警日志:

[traceId-xxx] 保存审查记录成功,recordId: 2034597939818225665
(约 1.5 分钟后,AI 调用返回)
[traceId-xxx] whenComplete 回调内异常,recordId: 2034597939818225665
ServiceException: 审查记录不存在

诡异之处在于:

  • 记录明明已经保存成功(日志 1)

  • 1.5 分钟后却查不到(日志 2)

  • 直接用 DB 工具查,记录就在那里

5.2 诊断日志揭开真相

在回调方法的入口加入诊断日志,输出当前线程的上下文状态:

诊断日志:
  requestTenantId: 171                        ← 方法参数,创建记录时的真实租户
  currentTenantId: 2034250086314766337        ← TenantContextHolder 当前值(另一个用户!)
  currentTenantVisibility: absent             ← 用当前租户上下文查不到记录
  ignoreTenantVisibility: present(deleted=false, reviewStatus=1)  ← 忽略租户后记录存在
  thread: general-review-coordinator8

直接结论:记录在 DB 中真实存在,但系统使用了错误的 tenant_id 查询。

原来,系统使用 MyBatis Plus 的多租户插件,所有 selectById 都会被拦截器自动加上 AND tenant_id = ?,这个 ? 的值来自 TenantContextHolder.getTenantId()

记录保存时:TenantContextHolder = 171 → tenant_id = 171 写入 DB
回调执行时:TenantContextHolder = 2034... → SELECT WHERE tenant_id = 2034... → 查不到

问题变成了:为什么回调线程上的 TenantContextHolder 是另一个用户的 ID?

5.3 修复一失败:MdcPropagatingExecutor 只传了 MDC

第一次修复思路:在 whenCompleteAsync 使用的 Executor 里同时传播上下文。

public class MdcPropagatingExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
        Map<String, String> mdcSnapshot = MDC.getCopyOfContextMap();  // 只捕获 MDC
        delegate.execute(() -> {
            try {
                if (mdcSnapshot != null) MDC.setContextMap(mdcSnapshot);
                command.run();
                // TenantContextHolder(TransmittableThreadLocal)完全没有传!
            } finally {
                MDC.clear();
            }
        });
    }
}

问题:TenantContextHolder 基于 TransmittableThreadLocalMdcPropagatingExecutor 只捕获了 MDC,TTL 上下文(包括租户 ID)完全没有传播。

上线后问题依然复现。

5.4 修复二失败:ContextPropagatingExecutor 捕获时机错了

第二次修复思路:在 Executor 的 execute() 里加入 TtlRunnable,同时传播 MDC 和 TTL。

public class ContextPropagatingExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
        Map<String, String> mdcSnapshot = MDC.getCopyOfContextMap();
        Runnable wrapped = TtlRunnable.get(() -> {  // ← 此处捕获 TTL 快照
            try {
                if (mdcSnapshot != null) MDC.setContextMap(mdcSnapshot);
                command.run();
            } finally {
                MDC.clear();
            }
        });
        delegate.execute(wrapped);
    }
}

这次逻辑上看起来完全正确:MDC 和 TTL 都传播了。但上线后问题依然在特定并发条件下复现

为什么?

关键在于 execute() 被谁在什么时机调用

开发者以为的调用时机:
  whenCompleteAsync 注册时 → 此时协调线程上下文正确(tenantId=171)
​
实际的调用时机:
  allOf 完成时 → 由完成最后一个子 Future 的 AI 线程调用
              → TtlRunnable.get() 捕获的是此时 AI 线程的 TTL
              → AI 线程的 TTL 基线 = 另一个租户的脏值

5.5 根因:InheritableThreadLocal 基线继承 + 提交时机错位

这是整个问题链中最隐蔽的部分。

TransmittableThreadLocal 继承自 InheritableThreadLocal。Java 的 InheritableThreadLocal 有一个特性:线程创建时,子线程会从父线程继承所有 InheritableThreadLocal 的值

线程池扩容时机:
  某次高并发,父线程持有 tenantId = 2034...(另一个用户正在发请求)
  → 线程池新建工作线程
  → 新线程从父线程继承 tenantId = 2034... 作为 InheritableThreadLocal 初始值
  → 该值成为这条线程的「TTL 基线」,永久存在于该线程上

TtlRunnable 的工作机制是 save/restore:

执行前(replay):
  备份子线程当前 TTL(= 脏基线 2034...)
  注入父任务捕获的快照(= 正确值 171)
​
执行后(restore):
  还原子线程 TTL = 脏基线 2034...(不是 null!)

因此,每次 TtlRunnable 任务结束,线程 TTL 都会恢复回脏基线。任何在这个线程上以普通 lambda 方式运行的代码,看到的都是这个脏基线。

完整的时序链路:


回调线程AI调用线程协调线程HTTP请求线程回调线程AI调用线程协调线程HTTP请求线程线程池扩容时继承父线程TTL基线 = tenant-B (脏)replay: 备份基线B, 注入171执行 executeReviewTask注册 whenCompleteAsync(callback, executor)此时 tenant=171 正确!方法返回restore: TTL恢复为基线B (脏)AI线程同样有自己的脏基线TtlRunnable.get() 在此时捕获捕获的是AI线程的脏基线!记录不存在!TtlRunnable(tenant-171)AI调用完成,TtlRunnable restore线程TTL = 自己的脏基线ContextPropagatingExecutor.execute(callback)回调执行,TTL = 脏基线selectById WHERE tenant_id = 脏值

两次修复失败的根本原因对比

修复方案 失败原因
MdcPropagatingExecutor 只传 MDC,TTL(租户)完全未传
ContextPropagatingExecutor + TtlRunnable 在 Future 完成时捕获(execute() 被 AI 线程调用),捕获到的是 AI 线程的脏基线,相当于传了一份更高置信度的错误值

5.6 正确修复:上下文是数据,不是线程状态

两次修复都失败的根本原因在于:它们都试图从线程状态中获取正确的上下文,但线程状态在这个场景下是不可靠的。

正确的思路是:把上下文从线程状态中解耦,作为显式数据通过闭包传递

tenantIdexecuteReviewTask 的方法参数,在注册 whenCompleteAsync 时就已固化为一个确定的值(171),完全不需要从线程状态中读取。

void executeReviewTask(Long recordId, Long tenantId) {
    // ... 构建 futures ...
​
    // tenantId 通过闭包捕获,与线程状态无关
    CompletableFuture.allOf(...)
        .whenCompleteAsync((v, ex) ->
            TenantUtils.execute(tenantId, () -> {  // 显式切换到正确租户
                // 此处 TenantContextHolder.getTenantId() 永远返回 171
                // 无论哪个线程跑这个回调,无论线程 TTL 基线是什么
                mergeAndSave(recordId, ...);
            }),
        coordinatorPool);
}

TenantUtils.execute 实现(标准的 save/set/run/restore 模式):

public static void execute(Long tenantId, Runnable runnable) {
    Long old = TenantContextHolder.getTenantId();
    Boolean oldIgnore = TenantContextHolder.isIgnore();
    try {
        TenantContextHolder.setTenantId(tenantId);
        TenantContextHolder.setIgnore(false);
        runnable.run();
    } finally {
        TenantContextHolder.setTenantId(old);   // 恢复,不污染线程
        TenantContextHolder.setIgnore(oldIgnore);
    }
}

三种方案的本质对比


正确修复:TenantUtils.execute

注册时刻
tenantId已固化在闭包中
值=171,与线程无关

回调内显式setTenantId(171)
线程TTL基线是什么无所谓
租户ID永远正确

修复二:ContextPropagatingExecutor

Future完成时
execute()被AI线程调用
TtlRunnable.get()捕获TTL

TTL = AI线程脏基线
传播了一份脏值
租户ID仍然错误

修复一:MdcPropagatingExecutor

Future完成时
execute()被AI线程调用
捕获MDC

TTL = AI线程脏基线
租户ID错误

改动极小,效果根本:只需 2 处改动(加 import + 用 TenantUtils.execute 包裹回调体),覆盖回调内所有数据库操作,彻底切断对线程 TTL 状态的依赖。


第六章:从五个陷阱提炼的架构原则

6.1 上下文传播原则:显式优于隐式

传播方式 安全性 适用场景
隐式:依赖 ThreadLocal/TTL 在线程间「自动流动」 低(受线程创建时机、捕获时机、脏基线影响) 同一线程内的短生命周期上下文
半显式:Executor 装饰器在提交时捕获快照 中(依赖提交者线程状态正确,捕获时机确定) traceId 等无强业务约束的上下文
显式:参数 + 闭包 + 框架工具在回调内注入 高(与线程状态完全解耦,不受脏基线影响) 租户 ID、用户 ID 等业务强相关上下文

架构建议:将多租户 ID、用户 ID 等具有业务含义的上下文,视为请求级别的数据而非线程级别的状态。在入口处提取后显式传递,在需要的地方显式注入,不依赖 ThreadLocal 「自动流动」。

6.2 背压设计原则:调用方不应承担工作线程角色

CallerRunsPolicy 的设计假设是「调用方可以阻塞」。这个假设在批处理、离线任务等场景是合理的,但在 Web 服务中,调用方是 Tomcat 线程或业务协调线程,它们的阻塞会直接影响用户可见的响应时间。

AI 调用这类高延迟任务,一旦让调用方线程充当工作线程,级联阻塱会沿调用栈向上传播,最终影响全站。

架构建议:线程池的拒绝策略必须与调用链上下游的容错策略匹配。对于 Web 服务后台任务,BlockThenRejectPolicy(短暂等待后真正拒绝)是合理选择,它让 catch (RejectedExecutionException) 真正生效,让调用方有机会做降级处理。

6.3 可观测性原则:traceId 必须覆盖每一个线程边界

每一次 runAsyncsupplyAsyncwhenCompleteAsync 都是一次线程边界跨越,traceId 不会自动传递。未覆盖的边界就是日志黑洞——一旦出问题,你看不到整条链路,只能靠 recordId、时间戳等业务字段拼凑。

架构建议:封装统一的 wrapRunnable/wrapSupplier 工具方法,所有异步提交必须经过该方法。Runnable 版和 Supplier 版要保持能力对齐(TTL 支持、MDC 传播),避免演化遗漏。可以通过 Code Review 规范或静态检查约束禁止直接裸提交。

6.4 多租户隔离原则:租户上下文是最高优先级的隔离边界

多租户系统中,任何一处租户上下文泄漏都可能导致 A 用户操作到 B 用户的数据。异步回调场景尤其危险,因为:

  • 问题不会立即暴露,只在特定并发条件下触发(高并发时线程池扩容,脏基线才会出现)

  • 症状看起来像「记录不存在」,不像「租户串扰」,极易误导排查方向

  • 两次看起来合理的修复都没有解决它,因为根因在线程模型的底层

架构建议:异步回调中任何涉及数据库操作的逻辑,一律用显式 tenantId 参数 + 框架提供的 TenantUtils.execute 注入,不依赖线程环境中的 TenantContextHolder。租户上下文的正确性不应依赖运行时的线程状态。


第七章:诊断方法论——四个追问

排查异步并发问题不靠运气,靠的是一套可复用的追问方式。

追问一:「现在是哪个线程?它有什么上下文?」

每当看到 thenApplywhenCompletesupplyAsync*Async(executor) 时,强迫自己停下来回答:

  1. 这段代码运行在哪个线程?(线程池名称?)

  2. 那个线程是在哪里创建/复用的?

  3. 那个线程上有没有 MDC?有没有 TTL 上下文?TTL 基线是什么?

  4. 谁提交了这个任务?提交者线程有正确的上下文吗?

以本文问题为例:

问:f1.whenComplete((r,ex) -> log.info(...)) 在哪个线程?
答:完成 f1 的 aiCallPool 线程(无 executor → 就地继承)
​
问:那个 aiCallPool 线程有 MDC 吗?
答:没有!wrapSupplier 的 finally 刚刚 clear 了
​
问:所以这条日志有 traceId 吗?
答:没有 → 发现丢失点 1

追问二:「跨线程边界时,什么会丢失?」

每次任务跨越线程边界(提交到线程池),要追问:

上下文类型 是否自动跨线程 正确的传播方式
MDC(traceId) wrapWithContext 手动 copy,或 MDC Executor 装饰器
普通 ThreadLocal 不建议跨线程,改用 TTL
TransmittableThreadLocal 是(需 TtlRunnable) TtlRunnable.get() 或显式参数
Spring Security Context 否(默认) 配置 MODE_INHERITABLETHREADLOCAL 或 TTL
Spring @Transactional 事务不能跨线程,异步方法不加 @Transactional
租户 ID(业务相关) 否(可靠方式) 显式参数 + 闭包 + TenantUtils.execute

追问三:「多线程修改同一状态时,用了什么保护?」

whenComplete 回调中修改共享状态时,要追问:

  1. 这个状态会被多个线程同时修改吗?

  2. 用的是哪种线程安全机制

// 反例:多个 Agent 同时失败,并发写库多次
future.whenComplete((r, ex) -> {
    if (ex != null) updateStatus(FAILED);  // ← 没有保护,可能重复执行
});
​
// 正例:AtomicBoolean.compareAndSet 保证只执行一次
AtomicBoolean failed = new AtomicBoolean(false);
future.whenComplete((r, ex) -> {
    if (ex != null && failed.compareAndSet(false, true)) {
        updateStatus(FAILED);  // ← 只有第一个 CAS 成功的线程执行
    }
});

常见的并发状态保护手段:

局部变量(effectively-final): 天然安全,Lambda 中直接使用
AtomicBoolean / AtomicInteger: 单变量的原子读写/CAS
ConcurrentHashMap: 并发 Map
synchronized / ReentrantLock: 临界区保护
CompletableFuture 本身: 内部 CAS 保证 happens-before

追问四:「上下文依赖的是线程状态,还是显式的数据?」

这是本次新增的追问,也是最容易被忽视的一个。

每当写 executor.execute(callback) 时,问:
  → execute() 是在「正确上下文的线程」上调用的吗?
  → 还是在「Future 完成时的某个不确定线程」上调用的?
  → 如果是后者,依赖 Executor 装饰器传播上下文可能是错的
​
每当写 TtlRunnable.get(task) 时,问:
  → get() 在哪个线程上调用?此刻那个线程的 TTL 是正确的吗?
  → 还是已经被 restore 回了脏基线?
​
更根本地问:
  → 这个上下文值(tenantId、userId)能不能作为显式参数传递?
  → 如果能,就用参数+闭包,不要依赖线程状态
  → 线程状态是「共享的、易污染的、时机敏感的」,显式参数是「确定的、不可变的、与线程无关的」

追问四的一个快速检查清单

场景 应使用 原因
traceId 传到异步回调 Executor 装饰器(半显式) traceId 无业务强约束,MDC 泄漏影响可控
租户 ID 传到异步回调 显式参数 + TenantUtils.execute 租户错误会导致跨租户数据访问,影响严重
用户 ID 传到异步回调 显式参数 同上
日志级别的上下文 隐式 TTL 丢失只影响可观测性,不影响正确性

第八章:总结

五个陷阱速查表

问题现象 根因 解决方案
日志 traceId 消失(丢失点 1) wrapSupplier finally 中 MDC.clear(),清掉同线程后续回调的上下文 移除 finally 中的 MDC.clear(),依赖下次任务覆盖
日志 traceId 消失(丢失点 2) supplyAsync 提交,执行线程无 MDC MdcPropagatingExecutor 装饰器统一处理
日志 traceId 消失(丢失点 3) whenCompleteAsync 提交者线程 MDC 不确定 同上(装饰器统一处理)
TTL 上下文丢失(租户/Security) wrapSupplier 未用 TtlRunnable,与 wrapRunnable 不对称 补充 TtlRunnable 封装,两者能力对齐
Tomcat 线程被 AI 调用拖死 CallerRunsPolicy 在高负载下让 Coordinator/Tomcat 线程变成 AI HTTP 工作线程 改用 BlockThenRejectPolicy,让 catch 块真正生效
Agent 失败后等待时间过长 allOf 任一失败不 cancel 其他 Future,线程资源浪费 Fail-Fast + AtomicBoolean.compareAndSet + cancel(false)
whenCompleteAsync 回调中"记录不存在",忽略租户后记录存在 TransmittableThreadLocal 继承脏 TTL 基线;ContextPropagatingExecutor 在 Future 完成时捕获,拿到的是脏值;两次修复均未解决 用显式 tenantId 参数 + TenantUtils.execute() 在回调内注入,彻底脱离线程状态

上下文传播决策树


上下文需要跨越
线程边界吗?

使用 ThreadLocal
同线程内安全

这个上下文有
业务强一致要求吗?
(如租户ID、用户ID)

traceId等可观测性
上下文

Executor装饰器
提交时捕获MDC快照
注意提交者线程时机

上下文值在注册
回调时是否确定?

重新设计:将值
作为参数显式传入

闭包捕获 + 框架工具注入
TenantUtils.execute(tenantId)
与线程状态完全解耦

四条核心结论

  1. MDC 是线程局部的,传播需要显式机制。MdcPropagatingExecutor 装饰器是最低耦合的方案,业务代码零侵入。但要注意它依赖提交者线程有正确的 MDC,捕获时机错误会导致传播失效。

  2. CallerRunsPolicy 是隐形的「线程污染器」,在高延迟场景下会把整条调用链上的所有线程变成工作线程,引发级联阻塞,且配套的 catch (RejectedExecutionException) 是死代码。

  3. CompletableFuture.cancel() ≠ 中断 HTTP,它只是标记 Future 状态为失败,阻止下游回调链,底层线程仍在阻塞。Fail-Fast 的价值是「第一时间通知用户失败」,而不是真正释放资源。

  4. 上下文是数据,不是线程状态TransmittableThreadLocal 在线程池中存在基线继承的隐患,Executor 装饰器的捕获时机可能是错误的。对于业务强相关的上下文(租户 ID、用户 ID),应将其从隐式线程状态提升为显式数据——随参数传递,随闭包捕获,用框架工具在需要的时候注入,而不是依赖线程「自动携带」。

诊断方法论四个追问

追问一:现在是哪个线程?它有什么上下文?TTL 基线是什么?

追问二:跨线程边界时,什么会丢失?

追问三:多线程修改同一状态时,用了什么保护?

追问四:上下文依赖的是线程状态,还是显式的数据?

把这四个追问变成代码审查和问题排查的习惯,能帮你在大多数异步并发问题出现之前就发现并修复它们。


如果本文对你有帮助,欢迎点赞收藏。 如有疑问或不同见解,欢迎在评论区交流。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐