异步地雷:AI 服务接入引发的 CompletableFuture 五大陷阱实录
线程模型精讲 · 上下文传播三层机制 · 背压策略设计 · 多租户隔离 · 两次错误修复的完整复盘
引言:一个看起来正确的系统,为什么会持续出错
这篇文章来自一次真实的生产排查经历。
系统上线后陆续暴露出几个问题:日志里 traceId 神秘消失、高并发时 Tomcat 接口响应变慢、某个用户的审核任务结果找不到记录——每个问题看起来彼此独立,排查起来毫无头绪。
更棘手的是:针对第三个问题的两次修复,都引发了新的 Bug。第一次修复只解决了表面现象;第二次修复逻辑上看起来完全正确,却依然在高并发下失效——排查到最后,才意识到根因藏在 Java 线程模型最底层的一个鲜为人知的细节里。
这五个问题,每一个单独拿出来都是常见题,但放在异步编程场景下,它们相互叠加,成为了系统性的「异步地雷」。
本文以这次经历为蓝本(代码已脱敏为伪代码),完整还原从现象发现 → 根因定位 → 解决方案 → 修复失败 → 再次定位 → 正确修复的全过程,并在最后提炼出可复用的架构原则和诊断方法论。
第〇章:系统架构与线程模型
0.1 业务场景
系统是一个「文档智能分析平台」。用户上传一份文档,平台从多个维度分析文档内容,每个维度由独立的 AI Agent 负责。
核心约束:单次 AI 调用耗时 120s ~ 600s。
这个约束决定了系统必须采用全异步架构:
-
用户请求立即返回(异步处理,不等 AI 结果)
-
后台并发调用多个 AI Agent
-
全部完成后合并结果写入数据库
系统骨架伪代码:
// ===== 入口层:Tomcat 线程 =====
public ReviewResp createReview(ReviewReq req) {
Long recordId = saveRecord(req); // 1. DB 保存任务记录
Long tenantId = SecurityFrameworkUtils.getTenantId(); // 2. 从安全上下文获取租户
submitCoordinatorTask( // 3. 异步提交,立即返回
() -> executeReviewTask(recordId, tenantId));
return buildResp(taskId);
}
// ===== 协调层:coordinatorPool 线程池 =====
void executeReviewTask(Long recordId, Long tenantId) {
String content = extractContent(fileUrl); // 同步 IO
CompletableFuture<String> f1 = callAiAsync(RULE, content, tenantId);
CompletableFuture<String> f2 = callAiAsync(CONTENT, content, tenantId);
CompletableFuture<String> f3 = callAiAsync(FORMAT, content, tenantId);
CompletableFuture<String> f4 = callAiAsync(DYNAMIC, content, tenantId);
CompletableFuture.allOf(f1, f2, f3, f4)
.whenCompleteAsync((v, ex) -> mergeAndSave(recordId, f1, f2, f3, f4),
coordinatorPool);
}
// ===== AI 调用层:aiCallPool 线程池(每次 120s~600s)=====
CompletableFuture<String> callAiAsync(AgentType type, String content, Long tenantId) {
return CompletableFuture.supplyAsync(
wrapSupplier(() -> httpCallAiService(type, content)),
aiCallPool);
}
线程池初始配置(问题发生时):
ThreadPoolExecutor coordinatorPool = new ThreadPoolExecutor( 5, 10, 60L, SECONDS, new LinkedBlockingQueue<>(200), new CallerRunsPolicy() // ← 埋下隐患 ); ThreadPoolExecutor aiCallPool = new ThreadPoolExecutor( 5, 15, 60L, SECONDS, new LinkedBlockingQueue<>(200), new CallerRunsPolicy() // ← 同样埋下隐患 );
0.2 三层线程模型与任务流转
单个请求的完整线程流转:
AI服务aiCallPoolcoordinatorPoolTomcat线程AI服务aiCallPoolcoordinatorPoolTomcat线程同步阶段立即返回,Tomcat线程释放协调阶段Future链构建完毕,协调线程释放AI调用阶段(120s~600s/次)合并阶段saveRecord 保存任务记录runAsync(wrapRunnable(task))extractContent 同步IO提取文本supplyAsync(RULE)supplyAsync(CONTENT)supplyAsync(FORMAT)supplyAsync(DYNAMIC)并发HTTP调用各Agent陆续返回allOf完成,whenCompleteAsync触发mergeAndSave 合并写库
这个模型看起来清晰合理,实际上埋了五个地雷。
第一章:陷阱一——日志 traceId 全面丢失(可观测性危机)
1.1 现象还原
接入链路追踪后,观察一次完整审查请求的日志:
[traceId=abc123] 保存审查记录成功,recordId=1001 [traceId=abc123] 开始提取文档内容,recordId=1001 [ ] 开始调用 RULE Agent,recordId=1001 ← traceId 消失! [ ] 开始调用 CONTENT Agent,recordId=1001 ← traceId 消失! [ ] mergeAndSave 合并结果,recordId=1001 ← traceId 消失!
一旦任务跨越线程边界,traceId 全部丢失。没有 traceId,整条链路无法串联,排查问题只能靠猜。
1.2 MDC 的本质:ThreadLocal 的别名
理解丢失原因,必须先搞清楚 MDC 是什么。
// MDC 底层等价伪代码
class MDC {
private static final ThreadLocal<Map<String, String>> ctx = new ThreadLocal<>();
public static void put(String key, String val) {
getOrCreate().put(key, val); // 仅对当前线程有效
}
public static String get(String key) {
Map<String, String> map = ctx.get();
return map == null ? null : map.get(key); // 其他线程取到的是 null
}
}
MDC 的三个核心特性:
-
线程绑定:只对当前线程可见,跨线程提交后目标线程的 MDC 是独立的,默认为空
-
线程池复用时会残留:线程执行完任务后,MDC 若未
clear(),下次复用时仍持有上次数据 -
CompletableFuture 无 executor 的回调继承完成线程的 MDC:
thenApply/whenComplete在完成前一阶段的同一线程上执行
1.3 三个丢失点精准定位
丢失点 1:wrapSupplier 的 finally 中执行了 MDC.clear()
static <T> Supplier<T> wrapSupplier(Supplier<T> supplier) {
Map<String, String> mdcSnapshot = MDC.getCopyOfContextMap();
return () -> {
try {
MDC.setContextMap(mdcSnapshot); // 注入 MDC
return supplier.get(); // AI 调用(阻塞 120s~600s)
} finally {
MDC.clear(); // ← 问题根源:清掉了同线程后续回调的 MDC!
}
};
}
MDC.clear() 清空了当前线程的 MDC。AI 调用完毕进入 finally 后,该线程的 MDC 被清空。但此时 Future 上还挂着无 executor 的 whenComplete 回调,它在同一线程上立即执行——此时 MDC 已经是空的。
日志系统aiCallPool线程日志系统aiCallPool线程wrapSupplier 注入 MDC traceId=abc123MDC 已空traceId 字段为空httpCallAiService() 执行 120s+finally MDC.clear()whenComplete 回调触发(同一线程)log.info("RULE Agent 返回")
丢失点 2:部分子任务直接 supplyAsync 裸提交,未经 MDC 包装
// 某次迭代新增子任务,忘记用 wrapSupplier 包装 CompletableFuture.supplyAsync( () -> runSubTask(params), // ← 执行线程无 MDC coordinatorPool );
丢失点 3:whenCompleteAsync 的提交者线程 MDC 不确定
coordinatorPoolaiCallPool线程2 (CONTENT 最后完成)aiCallPool线程1 (RULE)coordinatorPoolaiCallPool线程2 (CONTENT 最后完成)aiCallPool线程1 (RULE)最后一个完成,MDC 已空从 A2 捕获的 MDC = 空mergeAndSave 日志无 traceIdRULE完成,finally MDC.clear()CONTENT完成,finally MDC.clear()提交 whenCompleteAsync 回调
allOf 由最后完成的子 Future 触发,谁最后完成谁就是提交者。若提交者线程的 MDC 已被清空,coordinatorPool 执行合并回调时同样没有 traceId。
1.4 wrapRunnable 与 wrapSupplier 的不对称:代码演化遗漏
检查工具类时发现一个典型的代码演化遗漏问题:
// wrapRunnable(Runnable 版):使用了 TtlRunnable,TTL 上下文正确传播
static Runnable wrapRunnable(Runnable task) {
Map<String, String> mdc = MDC.getCopyOfContextMap();
return TtlRunnable.get(() -> { // ← 自动捕获/还原所有 TransmittableThreadLocal
MDC.setContextMap(mdc);
task.run();
});
}
// wrapSupplier(Supplier 版):只有普通 Lambda,TTL 上下文全部丢失
static <T> Supplier<T> wrapSupplier(Supplier<T> supplier) {
Map<String, String> mdc = MDC.getCopyOfContextMap();
return () -> { // ← 普通 Lambda,无 TTL 传播
MDC.setContextMap(mdc);
return supplier.get();
// finally: MDC.clear()
};
}
TtlRunnable 是阿里开源的 transmittable-thread-local 库提供的工具类,在提交时自动捕获所有 TransmittableThreadLocal 变量(多租户上下文、Spring Security Context、数据权限上下文等)的快照,执行时自动还原。
wrapRunnable 某次迭代中加入了 TTL 支持,wrapSupplier 忘记同步修改,导致两者能力不对称。
1.5 各类上下文传播机制对比
| 机制 | 同线程继承 | 跨线程传播 | 线程池复用安全 | 典型用途 |
|---|---|---|---|---|
普通 ThreadLocal |
是 | 否 | 否(残留污染) | 局部状态 |
MDC(set+clear) |
是(clear 前) | 否 | 是(显式 clear) | 日志追踪 |
InheritableThreadLocal |
是 | 是(仅 new Thread) | 否(线程池复用无效) | 父子线程传递 |
TransmittableThreadLocal(TTL) |
是 | 是(需 TtlRunnable) | 是 | 多租户/安全上下文 |
| MDC 装饰器 Executor | — | 是(提交时捕获) | 是 | 统一 MDC 传播 |
第二章:陷阱二——CallerRunsPolicy 让整条调用链级联崩溃
2.1 为什么 catch(RejectedExecutionException) 是死代码
static CompletableFuture<Void> submitCoordinatorTask(Runnable task) {
try {
return CompletableFuture.runAsync(wrapRunnable(task), coordinatorPool);
} catch (RejectedExecutionException e) {
// ← 开发者认为:队列满了会被拒绝,catch 里做降级
log.error("协调任务提交失败,系统过载", e);
return CompletableFuture.failedFuture(e);
}
}
这段代码看起来做了完整的拒绝处理,实际上 catch 块永远不会执行。
原因在于 coordinatorPool 使用的是 CallerRunsPolicy:
// JDK 源码
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run(); // ← 直接在调用线程上执行!不抛任何异常!
}
}
}
CallerRunsPolicy 队列满时不抛异常,而是直接在 submit() 的调用线程上运行任务。所以 catch 块是一段典型的死代码——开发者误以为有保护,实际上没有。
2.2 级联阻塞时序
AI 调用耗时 120s~600s 的场景下,CallerRunsPolicy 会引发严重的线程污染级联反应:
AI服务aiCallPoolcoordinatorPoolTomcat线程AI服务aiCallPoolcoordinatorPoolTomcat线程正常场景:高并发请求进来10个协调线程全部启动,各自提交AI调用max=15线程全满,阻塞300s+CallerRunsPolicy触发:新请求进来协调池有空位,正常接受队列也满!CallerRunsPolicy触发!协调线程直接执行httpCallAiService()阻塞300s!协调池也满了队列满!CallerRunsPolicy触发!Tomcat线程直接执行executeReviewTask()含同步IO,阻塞300s+HTTP接口响应开始变慢!submitCoordinatorTask(review-1)submitCoordinatorTask(review-2)callAiAsync x20submitCoordinatorTask(review-3)callAiAsync(review-3)submitCoordinatorTask(review-N)
级联路径清晰可见:
aiCallPool 满 + AI 调用耗时 300s → CallerRunsPolicy:coordinatorPool 线程被迫执行 HTTP 调用,阻塞 300s → coordinatorPool 所有线程占满 → CallerRunsPolicy:Tomcat 线程被迫执行 executeReviewTask() → Tomcat 线程阻塞,HTTP 接口响应变慢,影响全站
2.3 拒绝策略选型
| 策略 | 行为 | 适用场景 | 风险 |
|---|---|---|---|
CallerRunsPolicy |
调用线程直接执行 | 批处理、调用方可阻塞 | 高延迟场景下拖垮整条调用链 |
AbortPolicy(默认) |
抛 RejectedExecutionException |
调用方有明确降级逻辑 | 任务丢失 |
DiscardPolicy |
静默丢弃 | 允许丢失的统计类任务 | 任务丢失且无感知 |
BlockThenRejectPolicy(自定义) |
阻塞等待入队,超时才真正拒绝 | Web 服务后台任务 | 推荐 |
修复后,catch (RejectedExecutionException) 真正生效:
class BlockThenRejectPolicy implements RejectedExecutionHandler {
private final long timeout;
private final TimeUnit unit;
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (executor.isShutdown()) throw new RejectedExecutionException("线程池已关闭");
try {
boolean enqueued = executor.getQueue().offer(r, timeout, unit);
if (!enqueued) throw new RejectedExecutionException("等待入队超时,系统过载");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("等待入队被中断", e);
}
}
}
第三章:陷阱三——不知道回调在哪个线程执行
3.1 CompletableFuture 回调线程的唯一规则
没有 executor 参数 → 在完成前一阶段的线程上就地执行 有
*Async+ executor → 提交到指定 executor 执行
callAiAsync(RULE, content)
// ↑ aiCallPool 线程执行(阻塞 120s~600s)
.whenComplete((r, ex) -> log.info("RULE 返回"))
// ↑ 同一个 aiCallPool 线程(无 executor → 就地继承)
callAiAsync(DYNAMIC, content)
.thenApply(raw -> parseResult(raw))
// ↑ 同一个 aiCallPool 线程(无 executor → 就地继承)
.thenComposeAsync(ids -> callNextAiAsync(ids), coordinatorPool)
// ↑ 提交到 coordinatorPool,在协调线程上执行
CompletableFuture.allOf(f1, f2, f3, f4)
.whenCompleteAsync((v, ex) -> mergeAndSave(...), coordinatorPool)
// ↑ 由「最后完成的子 Future 所在线程」提交到 coordinatorPool
3.2 回调线程模型全景
无executor(whenComplete)
有executor(whenCompleteAsync)
无executor(thenApply)
有executor(thenComposeAsync)
callAiAsync(RULE)
aiCallPool线程
httpCallAiService()
阻塞120s~600s
同一aiCallPool线程
执行日志回调
coordinatorPool线程
执行mergeAndSave
callAiAsync(DYNAMIC)
aiCallPool线程
httpCallAiService()
同一aiCallPool线程
parseResult
coordinatorPool线程
nextStageAsync
3.3 自引用线程池的死锁误区
很多人看到协调线程池提交任务给自己会担心死锁:
// 协调线程上,把子任务也提交到协调线程池 CompletableFuture.supplyAsync(() -> runSubTask(params), coordinatorPool); allOf(...).whenCompleteAsync(callback, coordinatorPool);
实际上不会死锁。原因:
executeReviewTask() 构建完所有 Future 链后立即返回 → 协调线程释放,回到线程池等待 → AI 调用结果到来时,whenCompleteAsync 提交的任务 能获取到空闲的协调线程来执行 → 不存在「线程 A 持有锁等线程 B,线程 B 等线程 A」的循环等待
但自引用在高负载下会带来性能退化:协调线程池接近满载时,自引用提交的子任务需要排队,增加整体延迟。
3.4 「提交者线程」是上下文传播的关键隐患
对于 *Async(executor) 形式,执行线程的上下文由提交者线程决定,而不是由 executor 本身决定:
提交者线程有正确 MDC → 可通过 Executor 装饰器传播 提交者线程 MDC 为空 → 装饰器捕获到空 context,执行线程也无 MDC
allOf(...).whenCompleteAsync(callback, pool) 的提交者是「最后完成的那个子 Future 的线程」,这个线程是运行时才能确定的不确定值。这是第五章陷阱的伏笔。
第四章:陷阱四——一个 Agent 失败,线程和等待时间全部浪费
4.1 allOf 的失败语义
// f2 在 200s 时异常
CompletableFuture<String> f1 = callAiAsync(RULE, content); // 预计 400s
CompletableFuture<String> f2 = callAiAsync(CONTENT, content); // 200s 时异常
CompletableFuture<String> f3 = callAiAsync(FORMAT, content); // 预计 350s
CompletableFuture.allOf(f1, f2, f3).whenCompleteAsync((v, ex) -> {
if (ex != null) {
updateStatus(recordId, FAILED);
// f2 失败,整体标记失败,但:
// f1 还需要 200s 才自然结束 → aiCallPool 线程被白白占用
// f3 还需要 150s 才自然结束 → aiCallPool 线程被白白占用
return;
}
mergeResults(f1, f2, f3);
}, coordinatorPool);
两个问题:
-
结果浪费:f1 和 f3 的分析结果无法使用
-
线程浪费:f1 和 f3 的 aiCallPool 线程无法服务新请求
4.2 cancel(false) 的本质与局限
CompletableFuture.cancel() 和大多数人理解的不同:
// CompletableFuture.cancel() 源码精简
public boolean cancel(boolean mayInterruptIfRunning) {
// mayInterruptIfRunning 参数被忽略!
// 没有底层线程引用,无法发送 interrupt 信号
return internalComplete(new AltResult(new CancellationException()));
// 只是把 Future 状态设置为 CancellationException
// 底层的 httpCallAiService() 仍在运行!
}
| 方面 | FutureTask.cancel(true) |
CompletableFuture.cancel(any) |
|---|---|---|
| 中断底层线程 | 是(发送 interrupt) | 否(无底层线程引用) |
| Future 状态变化 | CancellationException |
CancellationException |
下游 thenApply |
不再执行 | 不再执行 |
| 底层 HTTP 连接 | 依赖 HTTP 客户端响应 interrupt | 不中断 |
cancel(false) 的实际价值:立即将 Future 状态标记为失败,阻止下游回调继续执行,但无法中断底层 HTTP 线程。
4.3 Fail-Fast + AtomicBoolean CAS 防重
List<CompletableFuture<?>> allFutures = Arrays.asList(f1, f2, f3, f4);
AtomicBoolean failed = new AtomicBoolean(false);
for (CompletableFuture<?> future : allFutures) {
future.whenComplete((result, ex) -> {
if (ex != null
&& !(ex instanceof CancellationException)
&& failed.compareAndSet(false, true)) { // CAS:只有第一个失败者执行
log.error("Agent 调用失败,触发 Fail-Fast,recordId={}", recordId, ex);
updateStatus(recordId, FAILED); // 立即通知用户
allFutures.forEach(f -> {
if (!f.isDone()) f.cancel(false); // 阻止后续回调
});
}
});
}
AI服务aiCallPool线程3 FORMATaiCallPool线程2 CONTENTaiCallPool线程1 RULEcoordinatorPoolAI服务aiCallPool线程3 FORMATaiCallPool线程2 CONTENTaiCallPool线程1 RULEcoordinatorPoolFuture链构建完,协调线程释放whenComplete触发compareAndSet(false,true)=true 首次失败HTTP线程不受影响,仍在等待AI响应allOf完成,whenCompleteAsync触发failed.get()=true,跳过合并直接returncallAiAsync(RULE)callAiAsync(CONTENT)callAiAsync(FORMAT)HTTP RULE调用(预计400s)HTTP CONTENT调用HTTP FORMAT调用(预计350s)200s后返回异常updateStatus(FAILED) 立即写库f1.cancel(false) 状态变CancellationExceptionf3.cancel(false) 状态变CancellationException400s后返回(结果被丢弃,无后续回调)350s后返回(同上)
Fail-Fast 的核心价值:将用户感知失败的时间,从「最慢 Agent 响应时间(600s)」缩短到「第一个失败 Agent 响应时间(200s)」。
compareAndSet 的作用:当多个 Agent 几乎同时失败时,保证只有第一个 CAS 成功的线程执行 updateStatus,避免重复写库。
第五章:陷阱五——两次错误修复引发的多租户串扰(最难排查)
前四个陷阱修复上线后,系统运行一段时间,开始出现一个新的、更诡异的问题。
5.1 现象:记录时存时不存在
线上告警日志:
[traceId-xxx] 保存审查记录成功,recordId: 2034597939818225665 (约 1.5 分钟后,AI 调用返回) [traceId-xxx] whenComplete 回调内异常,recordId: 2034597939818225665 ServiceException: 审查记录不存在
诡异之处在于:
-
记录明明已经保存成功(日志 1)
-
1.5 分钟后却查不到(日志 2)
-
直接用 DB 工具查,记录就在那里
5.2 诊断日志揭开真相
在回调方法的入口加入诊断日志,输出当前线程的上下文状态:
诊断日志: requestTenantId: 171 ← 方法参数,创建记录时的真实租户 currentTenantId: 2034250086314766337 ← TenantContextHolder 当前值(另一个用户!) currentTenantVisibility: absent ← 用当前租户上下文查不到记录 ignoreTenantVisibility: present(deleted=false, reviewStatus=1) ← 忽略租户后记录存在 thread: general-review-coordinator8
直接结论:记录在 DB 中真实存在,但系统使用了错误的 tenant_id 查询。
原来,系统使用 MyBatis Plus 的多租户插件,所有 selectById 都会被拦截器自动加上 AND tenant_id = ?,这个 ? 的值来自 TenantContextHolder.getTenantId()。
记录保存时:TenantContextHolder = 171 → tenant_id = 171 写入 DB 回调执行时:TenantContextHolder = 2034... → SELECT WHERE tenant_id = 2034... → 查不到
问题变成了:为什么回调线程上的 TenantContextHolder 是另一个用户的 ID?
5.3 修复一失败:MdcPropagatingExecutor 只传了 MDC
第一次修复思路:在 whenCompleteAsync 使用的 Executor 里同时传播上下文。
public class MdcPropagatingExecutor implements Executor {
@Override
public void execute(Runnable command) {
Map<String, String> mdcSnapshot = MDC.getCopyOfContextMap(); // 只捕获 MDC
delegate.execute(() -> {
try {
if (mdcSnapshot != null) MDC.setContextMap(mdcSnapshot);
command.run();
// TenantContextHolder(TransmittableThreadLocal)完全没有传!
} finally {
MDC.clear();
}
});
}
}
问题:TenantContextHolder 基于 TransmittableThreadLocal,MdcPropagatingExecutor 只捕获了 MDC,TTL 上下文(包括租户 ID)完全没有传播。
上线后问题依然复现。
5.4 修复二失败:ContextPropagatingExecutor 捕获时机错了
第二次修复思路:在 Executor 的 execute() 里加入 TtlRunnable,同时传播 MDC 和 TTL。
public class ContextPropagatingExecutor implements Executor {
@Override
public void execute(Runnable command) {
Map<String, String> mdcSnapshot = MDC.getCopyOfContextMap();
Runnable wrapped = TtlRunnable.get(() -> { // ← 此处捕获 TTL 快照
try {
if (mdcSnapshot != null) MDC.setContextMap(mdcSnapshot);
command.run();
} finally {
MDC.clear();
}
});
delegate.execute(wrapped);
}
}
这次逻辑上看起来完全正确:MDC 和 TTL 都传播了。但上线后问题依然在特定并发条件下复现。
为什么?
关键在于 execute() 被谁在什么时机调用。
开发者以为的调用时机: whenCompleteAsync 注册时 → 此时协调线程上下文正确(tenantId=171) 实际的调用时机: allOf 完成时 → 由完成最后一个子 Future 的 AI 线程调用 → TtlRunnable.get() 捕获的是此时 AI 线程的 TTL → AI 线程的 TTL 基线 = 另一个租户的脏值
5.5 根因:InheritableThreadLocal 基线继承 + 提交时机错位
这是整个问题链中最隐蔽的部分。
TransmittableThreadLocal 继承自 InheritableThreadLocal。Java 的 InheritableThreadLocal 有一个特性:线程创建时,子线程会从父线程继承所有 InheritableThreadLocal 的值。
线程池扩容时机: 某次高并发,父线程持有 tenantId = 2034...(另一个用户正在发请求) → 线程池新建工作线程 → 新线程从父线程继承 tenantId = 2034... 作为 InheritableThreadLocal 初始值 → 该值成为这条线程的「TTL 基线」,永久存在于该线程上
TtlRunnable 的工作机制是 save/restore:
执行前(replay): 备份子线程当前 TTL(= 脏基线 2034...) 注入父任务捕获的快照(= 正确值 171) 执行后(restore): 还原子线程 TTL = 脏基线 2034...(不是 null!)
因此,每次 TtlRunnable 任务结束,线程 TTL 都会恢复回脏基线。任何在这个线程上以普通 lambda 方式运行的代码,看到的都是这个脏基线。
完整的时序链路:
回调线程AI调用线程协调线程HTTP请求线程回调线程AI调用线程协调线程HTTP请求线程线程池扩容时继承父线程TTL基线 = tenant-B (脏)replay: 备份基线B, 注入171执行 executeReviewTask注册 whenCompleteAsync(callback, executor)此时 tenant=171 正确!方法返回restore: TTL恢复为基线B (脏)AI线程同样有自己的脏基线TtlRunnable.get() 在此时捕获捕获的是AI线程的脏基线!记录不存在!TtlRunnable(tenant-171)AI调用完成,TtlRunnable restore线程TTL = 自己的脏基线ContextPropagatingExecutor.execute(callback)回调执行,TTL = 脏基线selectById WHERE tenant_id = 脏值
两次修复失败的根本原因对比:
| 修复方案 | 失败原因 |
|---|---|
| MdcPropagatingExecutor | 只传 MDC,TTL(租户)完全未传 |
| ContextPropagatingExecutor + TtlRunnable | 在 Future 完成时捕获(execute() 被 AI 线程调用),捕获到的是 AI 线程的脏基线,相当于传了一份更高置信度的错误值 |
5.6 正确修复:上下文是数据,不是线程状态
两次修复都失败的根本原因在于:它们都试图从线程状态中获取正确的上下文,但线程状态在这个场景下是不可靠的。
正确的思路是:把上下文从线程状态中解耦,作为显式数据通过闭包传递。
tenantId 是 executeReviewTask 的方法参数,在注册 whenCompleteAsync 时就已固化为一个确定的值(171),完全不需要从线程状态中读取。
void executeReviewTask(Long recordId, Long tenantId) {
// ... 构建 futures ...
// tenantId 通过闭包捕获,与线程状态无关
CompletableFuture.allOf(...)
.whenCompleteAsync((v, ex) ->
TenantUtils.execute(tenantId, () -> { // 显式切换到正确租户
// 此处 TenantContextHolder.getTenantId() 永远返回 171
// 无论哪个线程跑这个回调,无论线程 TTL 基线是什么
mergeAndSave(recordId, ...);
}),
coordinatorPool);
}
TenantUtils.execute 实现(标准的 save/set/run/restore 模式):
public static void execute(Long tenantId, Runnable runnable) {
Long old = TenantContextHolder.getTenantId();
Boolean oldIgnore = TenantContextHolder.isIgnore();
try {
TenantContextHolder.setTenantId(tenantId);
TenantContextHolder.setIgnore(false);
runnable.run();
} finally {
TenantContextHolder.setTenantId(old); // 恢复,不污染线程
TenantContextHolder.setIgnore(oldIgnore);
}
}
三种方案的本质对比:
正确修复:TenantUtils.execute
注册时刻
tenantId已固化在闭包中
值=171,与线程无关
回调内显式setTenantId(171)
线程TTL基线是什么无所谓
租户ID永远正确
修复二:ContextPropagatingExecutor
Future完成时
execute()被AI线程调用
TtlRunnable.get()捕获TTL
TTL = AI线程脏基线
传播了一份脏值
租户ID仍然错误
修复一:MdcPropagatingExecutor
Future完成时
execute()被AI线程调用
捕获MDC
TTL = AI线程脏基线
租户ID错误
改动极小,效果根本:只需 2 处改动(加 import + 用 TenantUtils.execute 包裹回调体),覆盖回调内所有数据库操作,彻底切断对线程 TTL 状态的依赖。
第六章:从五个陷阱提炼的架构原则
6.1 上下文传播原则:显式优于隐式
| 传播方式 | 安全性 | 适用场景 |
|---|---|---|
| 隐式:依赖 ThreadLocal/TTL 在线程间「自动流动」 | 低(受线程创建时机、捕获时机、脏基线影响) | 同一线程内的短生命周期上下文 |
| 半显式:Executor 装饰器在提交时捕获快照 | 中(依赖提交者线程状态正确,捕获时机确定) | traceId 等无强业务约束的上下文 |
| 显式:参数 + 闭包 + 框架工具在回调内注入 | 高(与线程状态完全解耦,不受脏基线影响) | 租户 ID、用户 ID 等业务强相关上下文 |
架构建议:将多租户 ID、用户 ID 等具有业务含义的上下文,视为请求级别的数据而非线程级别的状态。在入口处提取后显式传递,在需要的地方显式注入,不依赖 ThreadLocal 「自动流动」。
6.2 背压设计原则:调用方不应承担工作线程角色
CallerRunsPolicy 的设计假设是「调用方可以阻塞」。这个假设在批处理、离线任务等场景是合理的,但在 Web 服务中,调用方是 Tomcat 线程或业务协调线程,它们的阻塞会直接影响用户可见的响应时间。
AI 调用这类高延迟任务,一旦让调用方线程充当工作线程,级联阻塱会沿调用栈向上传播,最终影响全站。
架构建议:线程池的拒绝策略必须与调用链上下游的容错策略匹配。对于 Web 服务后台任务,
BlockThenRejectPolicy(短暂等待后真正拒绝)是合理选择,它让catch (RejectedExecutionException)真正生效,让调用方有机会做降级处理。
6.3 可观测性原则:traceId 必须覆盖每一个线程边界
每一次 runAsync、supplyAsync、whenCompleteAsync 都是一次线程边界跨越,traceId 不会自动传递。未覆盖的边界就是日志黑洞——一旦出问题,你看不到整条链路,只能靠 recordId、时间戳等业务字段拼凑。
架构建议:封装统一的
wrapRunnable/wrapSupplier工具方法,所有异步提交必须经过该方法。Runnable 版和 Supplier 版要保持能力对齐(TTL 支持、MDC 传播),避免演化遗漏。可以通过 Code Review 规范或静态检查约束禁止直接裸提交。
6.4 多租户隔离原则:租户上下文是最高优先级的隔离边界
多租户系统中,任何一处租户上下文泄漏都可能导致 A 用户操作到 B 用户的数据。异步回调场景尤其危险,因为:
-
问题不会立即暴露,只在特定并发条件下触发(高并发时线程池扩容,脏基线才会出现)
-
症状看起来像「记录不存在」,不像「租户串扰」,极易误导排查方向
-
两次看起来合理的修复都没有解决它,因为根因在线程模型的底层
架构建议:异步回调中任何涉及数据库操作的逻辑,一律用显式
tenantId参数 + 框架提供的TenantUtils.execute注入,不依赖线程环境中的TenantContextHolder。租户上下文的正确性不应依赖运行时的线程状态。
第七章:诊断方法论——四个追问
排查异步并发问题不靠运气,靠的是一套可复用的追问方式。
追问一:「现在是哪个线程?它有什么上下文?」
每当看到 thenApply、whenComplete、supplyAsync、*Async(executor) 时,强迫自己停下来回答:
-
这段代码运行在哪个线程?(线程池名称?)
-
那个线程是在哪里创建/复用的?
-
那个线程上有没有 MDC?有没有 TTL 上下文?TTL 基线是什么?
-
是谁提交了这个任务?提交者线程有正确的上下文吗?
以本文问题为例:
问:f1.whenComplete((r,ex) -> log.info(...)) 在哪个线程? 答:完成 f1 的 aiCallPool 线程(无 executor → 就地继承) 问:那个 aiCallPool 线程有 MDC 吗? 答:没有!wrapSupplier 的 finally 刚刚 clear 了 问:所以这条日志有 traceId 吗? 答:没有 → 发现丢失点 1
追问二:「跨线程边界时,什么会丢失?」
每次任务跨越线程边界(提交到线程池),要追问:
| 上下文类型 | 是否自动跨线程 | 正确的传播方式 |
|---|---|---|
| MDC(traceId) | 否 | wrapWithContext 手动 copy,或 MDC Executor 装饰器 |
普通 ThreadLocal |
否 | 不建议跨线程,改用 TTL |
TransmittableThreadLocal |
是(需 TtlRunnable) | TtlRunnable.get() 或显式参数 |
| Spring Security Context | 否(默认) | 配置 MODE_INHERITABLETHREADLOCAL 或 TTL |
Spring @Transactional |
否 | 事务不能跨线程,异步方法不加 @Transactional |
| 租户 ID(业务相关) | 否(可靠方式) | 显式参数 + 闭包 + TenantUtils.execute |
追问三:「多线程修改同一状态时,用了什么保护?」
在 whenComplete 回调中修改共享状态时,要追问:
-
这个状态会被多个线程同时修改吗?
-
用的是哪种线程安全机制?
// 反例:多个 Agent 同时失败,并发写库多次
future.whenComplete((r, ex) -> {
if (ex != null) updateStatus(FAILED); // ← 没有保护,可能重复执行
});
// 正例:AtomicBoolean.compareAndSet 保证只执行一次
AtomicBoolean failed = new AtomicBoolean(false);
future.whenComplete((r, ex) -> {
if (ex != null && failed.compareAndSet(false, true)) {
updateStatus(FAILED); // ← 只有第一个 CAS 成功的线程执行
}
});
常见的并发状态保护手段:
局部变量(effectively-final): 天然安全,Lambda 中直接使用 AtomicBoolean / AtomicInteger: 单变量的原子读写/CAS ConcurrentHashMap: 并发 Map synchronized / ReentrantLock: 临界区保护 CompletableFuture 本身: 内部 CAS 保证 happens-before
追问四:「上下文依赖的是线程状态,还是显式的数据?」
这是本次新增的追问,也是最容易被忽视的一个。
每当写 executor.execute(callback) 时,问: → execute() 是在「正确上下文的线程」上调用的吗? → 还是在「Future 完成时的某个不确定线程」上调用的? → 如果是后者,依赖 Executor 装饰器传播上下文可能是错的 每当写 TtlRunnable.get(task) 时,问: → get() 在哪个线程上调用?此刻那个线程的 TTL 是正确的吗? → 还是已经被 restore 回了脏基线? 更根本地问: → 这个上下文值(tenantId、userId)能不能作为显式参数传递? → 如果能,就用参数+闭包,不要依赖线程状态 → 线程状态是「共享的、易污染的、时机敏感的」,显式参数是「确定的、不可变的、与线程无关的」
追问四的一个快速检查清单
| 场景 | 应使用 | 原因 |
|---|---|---|
| traceId 传到异步回调 | Executor 装饰器(半显式) | traceId 无业务强约束,MDC 泄漏影响可控 |
| 租户 ID 传到异步回调 | 显式参数 + TenantUtils.execute | 租户错误会导致跨租户数据访问,影响严重 |
| 用户 ID 传到异步回调 | 显式参数 | 同上 |
| 日志级别的上下文 | 隐式 TTL | 丢失只影响可观测性,不影响正确性 |
第八章:总结
五个陷阱速查表
| 问题现象 | 根因 | 解决方案 |
|---|---|---|
| 日志 traceId 消失(丢失点 1) | wrapSupplier finally 中 MDC.clear(),清掉同线程后续回调的上下文 |
移除 finally 中的 MDC.clear(),依赖下次任务覆盖 |
| 日志 traceId 消失(丢失点 2) | 裸 supplyAsync 提交,执行线程无 MDC |
MdcPropagatingExecutor 装饰器统一处理 |
| 日志 traceId 消失(丢失点 3) | whenCompleteAsync 提交者线程 MDC 不确定 |
同上(装饰器统一处理) |
| TTL 上下文丢失(租户/Security) | wrapSupplier 未用 TtlRunnable,与 wrapRunnable 不对称 |
补充 TtlRunnable 封装,两者能力对齐 |
| Tomcat 线程被 AI 调用拖死 | CallerRunsPolicy 在高负载下让 Coordinator/Tomcat 线程变成 AI HTTP 工作线程 |
改用 BlockThenRejectPolicy,让 catch 块真正生效 |
| Agent 失败后等待时间过长 | allOf 任一失败不 cancel 其他 Future,线程资源浪费 |
Fail-Fast + AtomicBoolean.compareAndSet + cancel(false) |
| whenCompleteAsync 回调中"记录不存在",忽略租户后记录存在 | TransmittableThreadLocal 继承脏 TTL 基线;ContextPropagatingExecutor 在 Future 完成时捕获,拿到的是脏值;两次修复均未解决 | 用显式 tenantId 参数 + TenantUtils.execute() 在回调内注入,彻底脱离线程状态 |
上下文传播决策树
否
是
否
是
否
是
上下文需要跨越
线程边界吗?
使用 ThreadLocal
同线程内安全
这个上下文有
业务强一致要求吗?
(如租户ID、用户ID)
traceId等可观测性
上下文
Executor装饰器
提交时捕获MDC快照
注意提交者线程时机
上下文值在注册
回调时是否确定?
重新设计:将值
作为参数显式传入
闭包捕获 + 框架工具注入
TenantUtils.execute(tenantId)
与线程状态完全解耦
四条核心结论
-
MDC 是线程局部的,传播需要显式机制。
MdcPropagatingExecutor装饰器是最低耦合的方案,业务代码零侵入。但要注意它依赖提交者线程有正确的 MDC,捕获时机错误会导致传播失效。 -
CallerRunsPolicy是隐形的「线程污染器」,在高延迟场景下会把整条调用链上的所有线程变成工作线程,引发级联阻塞,且配套的catch (RejectedExecutionException)是死代码。 -
CompletableFuture.cancel()≠ 中断 HTTP,它只是标记 Future 状态为失败,阻止下游回调链,底层线程仍在阻塞。Fail-Fast 的价值是「第一时间通知用户失败」,而不是真正释放资源。 -
上下文是数据,不是线程状态。
TransmittableThreadLocal在线程池中存在基线继承的隐患,Executor 装饰器的捕获时机可能是错误的。对于业务强相关的上下文(租户 ID、用户 ID),应将其从隐式线程状态提升为显式数据——随参数传递,随闭包捕获,用框架工具在需要的时候注入,而不是依赖线程「自动携带」。
诊断方法论四个追问
追问一:现在是哪个线程?它有什么上下文?TTL 基线是什么?
追问二:跨线程边界时,什么会丢失?
追问三:多线程修改同一状态时,用了什么保护?
追问四:上下文依赖的是线程状态,还是显式的数据?
把这四个追问变成代码审查和问题排查的习惯,能帮你在大多数异步并发问题出现之前就发现并修复它们。
如果本文对你有帮助,欢迎点赞收藏。 如有疑问或不同见解,欢迎在评论区交流。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)