线程安全 ≠ 协程安全:当全局缓存同时遇上线程池和 async,优秀 Python 工程师该如何设计?
线程安全 ≠ 协程安全:当全局缓存同时遇上线程池和 async,优秀 Python 工程师该如何设计?
Python 让很多人第一次感受到编程的温柔:语法简洁,生态丰富,既能写 Web 服务,也能做数据分析、自动化脚本、机器学习和 AI 应用。但越是“简单好用”的语言,越容易在工程深水区暴露细节。
比如今天这个问题:
一个全局缓存,既会被线程池中的同步任务访问,又会被
asyncio异步上下文访问。
线程安全和协程安全有什么区别?threading.Lock和asyncio.Lock能混用吗?
这是一个非常典型的 Python 实战问题。它不只是锁的选择问题,而是并发模型、资源边界和工程责任的问题。
一、先给结论:不能简单混用
threading.Lock 和 asyncio.Lock 不能当成同一种锁混用。
更准确地说:
threading.Lock 保护的是线程之间的共享资源。
asyncio.Lock 保护的是同一个事件循环内协程之间的共享资源。
它们面对的调度模型不同:
线程:由操作系统抢占式调度,可能在任意时刻被切换。
协程:由事件循环协作式调度,通常在 await 处让出执行权。
因此,在线程池和异步上下文同时访问全局缓存时,最危险的做法是:
# 错误倾向:以为用了 asyncio.Lock 就能保护线程池里的访问
cache = {}
async_lock = asyncio.Lock()
或者:
# 错误倾向:在 async 函数中直接用 threading.Lock 包住 await
thread_lock = threading.Lock()
async def bad():
with thread_lock:
await some_io()
前者保护不了线程池里的同步代码,后者可能阻塞事件循环,甚至造成死锁和吞吐下降。
二、线程安全:面对“抢占式切换”的安全
线程安全关注的是:多个线程同时访问同一份可变数据时,程序是否仍然正确。
例如一个全局缓存:
cache = {}
def get_user(user_id):
if user_id not in cache:
cache[user_id] = load_user_from_db(user_id)
return cache[user_id]
这段代码看似简单,但在多线程下有竞态条件。
两个线程可能同时发现 user_id not in cache,然后同时去数据库加载,再同时写入缓存。轻则重复请求,重则写入脏数据。
正确做法之一是使用 threading.Lock:
import threading
cache = {}
cache_lock = threading.Lock()
def get_user(user_id):
with cache_lock:
if user_id not in cache:
cache[user_id] = load_user_from_db(user_id)
return cache[user_id]
但这里还有一个性能问题:如果 load_user_from_db 很慢,那么锁会被长时间持有,其他线程都被挡住。
更好的做法是区分“检查缓存”和“加载数据”的边界:
import threading
cache = {}
cache_lock = threading.Lock()
def get_user(user_id):
with cache_lock:
user = cache.get(user_id)
if user is not None:
return user
user = load_user_from_db(user_id)
with cache_lock:
existing = cache.get(user_id)
if existing is not None:
return existing
cache[user_id] = user
return user
这不是完美的单飞请求合并,但比长时间持锁更健康。它体现了一个重要原则:
锁应该尽量保护共享状态,而不是保护整个业务流程。
三、协程安全:面对“await 让出执行权”的安全
协程安全关注的是:多个协程在同一个事件循环中交替运行时,共享状态是否仍然正确。
很多初学者以为:
async 是单线程,所以不会有并发问题。
这也是误区。
看这个例子:
counter = 0
async def increase():
global counter
value = counter
await asyncio.sleep(0)
counter = value + 1
如果同时运行多个协程:
import asyncio
counter = 0
async def increase():
global counter
value = counter
await asyncio.sleep(0)
counter = value + 1
async def main():
await asyncio.gather(*(increase() for _ in range(1000)))
print(counter)
asyncio.run(main())
你可能期待输出 1000,但结果可能远小于 1000。
原因是:协程虽然不是被操作系统随时打断,但它会在 await 处主动让出控制权。多个协程可能读取到同一个旧值,然后覆盖彼此的更新。
此时应该使用 asyncio.Lock:
import asyncio
counter = 0
lock = asyncio.Lock()
async def increase():
global counter
async with lock:
value = counter
await asyncio.sleep(0)
counter = value + 1
不过这里也有一个实践提醒:不要在锁内做太多耗时 I/O。虽然 asyncio.Lock 不会阻塞整个线程,但它会让其他等待同一把锁的协程排队。
四、线程安全和协程安全的核心差异
可以用一张表概括。
| 对比项 | 线程安全 | 协程安全 |
|---|---|---|
| 调度方式 | 操作系统抢占式调度 | 事件循环协作式调度 |
| 切换时机 | 理论上可能发生在很多地方 | 通常发生在 await 处 |
| 常用锁 | threading.Lock、RLock |
asyncio.Lock |
| 是否阻塞线程 | 会阻塞当前线程 | 不阻塞事件循环线程,但会挂起协程 |
| 保护范围 | 多线程共享数据 | 同一事件循环内的协程共享数据 |
| 是否跨线程有效 | 是 | 否,不应用于线程池同步代码 |
| 是否可在 async 中随便用 | 不建议 | 是 async 场景首选 |
一句话:
threading.Lock 是给线程看的。
asyncio.Lock 是给协程看的。
它们不是替代关系,而是服务于不同并发世界的工具。
五、错误示例:在 async 中直接使用 threading.Lock
假设你写了这样的代码:
import threading
import asyncio
cache = {}
lock = threading.Lock()
async def get_value(key):
with lock:
if key in cache:
return cache[key]
value = await fetch_from_remote(key)
cache[key] = value
return value
这段代码有两个大问题。
第一,threading.Lock 的获取是阻塞式的。如果另一个线程持有这把锁,当前事件循环线程会被卡住,整个 async 服务都可能变慢。
第二,更严重的是,你在持有 threading.Lock 的时候执行了 await。这意味着当前协程让出了执行权,但锁还没释放。其他线程或代码如果也在等待这把锁,就可能形成非常难排查的阻塞链。
正确原则是:
不要在 threading.Lock 保护区内 await。
不要用阻塞锁包裹异步 I/O。
六、错误示例:用 asyncio.Lock 保护线程池共享数据
再看另一个错误方向:
import asyncio
from concurrent.futures import ThreadPoolExecutor
cache = {}
lock = asyncio.Lock()
def worker(key):
# 线程池里的同步函数无法 async with lock
if key not in cache:
cache[key] = compute(key)
return cache[key]
asyncio.Lock 必须通过 async with 和 await 使用,它属于事件循环机制。线程池里的同步函数无法直接等待它。
即使你想强行绕,也会让代码变得脆弱、复杂,而且容易出现事件循环绑定、跨线程调度和死锁问题。
正确原则是:
asyncio.Lock 不能保护线程池里的普通同步函数。
线程池访问共享数据时,需要线程锁或更清晰的资源所有权模型。
七、真实场景:全局缓存同时被线程池和 async 访问
假设我们有一个图片处理服务。
API 层是异步的:
@app.get("/image/{image_id}")
async def get_image(image_id: str):
...
但图像处理是 CPU 密集型,会丢到线程池或进程池中:
result = await loop.run_in_executor(pool, process_image, image_id)
同时,API 层和 worker 都要访问一个全局缓存:
cache = {
"image:001": b"..."
}
这时如果随便在 async 代码里用 asyncio.Lock,线程池不受保护;如果全局都用 threading.Lock,async API 又可能被阻塞。
怎么办?
推荐三种方案。
八、方案一:统一用线程安全缓存,async 层通过 to_thread 访问
如果缓存必须被线程池和 async 层共同访问,可以把缓存设计成一个纯同步、线程安全的组件。
import threading
import time
from typing import Any
class ThreadSafeTTLCache:
def __init__(self, ttl_seconds: int = 300):
self._data: dict[str, tuple[float, Any]] = {}
self._ttl = ttl_seconds
self._lock = threading.RLock()
def get(self, key: str) -> Any | None:
now = time.time()
with self._lock:
item = self._data.get(key)
if item is None:
return None
expire_at, value = item
if expire_at < now:
del self._data[key]
return None
return value
def set(self, key: str, value: Any) -> None:
expire_at = time.time() + self._ttl
with self._lock:
self._data[key] = (expire_at, value)
def delete(self, key: str) -> None:
with self._lock:
self._data.pop(key, None)
线程池中可以直接使用:
cache = ThreadSafeTTLCache()
def process_image(image_id: str):
cached = cache.get(image_id)
if cached is not None:
return cached
result = heavy_compute(image_id)
cache.set(image_id, result)
return result
异步上下文中不要直接调用可能阻塞的锁竞争逻辑,可以通过 asyncio.to_thread 转到线程中执行:
import asyncio
async def async_get_cache(key: str):
return await asyncio.to_thread(cache.get, key)
async def async_set_cache(key: str, value):
await asyncio.to_thread(cache.set, key, value)
API 层:
async def get_image_result(image_id: str):
cached = await async_get_cache(image_id)
if cached is not None:
return cached
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, process_image, image_id)
return result
这个方案的好处是:共享状态只有一种保护规则:线程锁。
缺点是:async 访问缓存时有线程切换开销。但对于复杂服务来说,这个开销通常比锁模型混乱更可控。
九、方案二:缓存只属于事件循环,线程通过安全通道访问
另一种思路是:明确缓存归属权。
比如规定:
缓存只允许在 async 事件循环中被直接访问。
线程池不能直接读写缓存。
线程池要访问缓存,必须通过事件循环提交请求。
可以使用 asyncio.run_coroutine_threadsafe:
import asyncio
from typing import Any
class AsyncCache:
def __init__(self):
self._data: dict[str, Any] = {}
self._lock = asyncio.Lock()
async def get(self, key: str):
async with self._lock:
return self._data.get(key)
async def set(self, key: str, value: Any):
async with self._lock:
self._data[key] = value
在线程中访问时:
def worker_get_cache(loop, cache: AsyncCache, key: str):
future = asyncio.run_coroutine_threadsafe(cache.get(key), loop)
return future.result()
这个方案更适合缓存强依赖 async 生命周期的场景。
但它也有风险:线程会等待事件循环返回结果。如果事件循环又在等待线程池结果,就可能形成循环等待。因此必须谨慎设计调用方向。
十、方案三:把缓存独立成服务或 Actor
在更复杂的系统里,我更推荐把共享状态变成一个“独立所有者”。
示意图:
Async API 层
|
| 请求缓存
v
Cache Manager / Actor
^
| 请求缓存
Thread Pool Worker
缓存不再是一个到处可访问的全局 dict,而是由一个专门组件管理。其他模块通过方法、队列、RPC 或消息传递访问它。
这种思想叫:
不要共享内存来通信。
而是通过通信来共享状态。
在 Python 项目中,可以用:
本地队列
专门的缓存管理线程
Redis
Memcached
数据库缓存表
消息队列
如果是多进程部署,进程内全局缓存本来就不可靠,因为每个进程都有自己的内存副本。此时 Redis 这类外部缓存往往更合适。
十一、最佳实践:锁要短,边界要清晰
无论使用线程锁还是协程锁,都要遵守几个原则。
1. 不要在锁里做慢操作
不推荐:
with lock:
value = slow_network_call()
cache[key] = value
推荐:
value = slow_network_call()
with lock:
cache[key] = value
对于 async 也是一样。
不推荐:
async with lock:
value = await fetch_data()
cache[key] = value
推荐:
value = await fetch_data()
async with lock:
cache[key] = value
当然,这可能带来重复加载问题。如果要避免重复加载,需要引入更精细的 per-key lock 或 singleflight 模式。
2. 锁保护的是不变量
比如缓存的不变量可能是:
_data 中每个 key 对应的 value 必须没有过期。
_hits 和 _misses 的统计必须和访问行为一致。
某个 key 的构建过程不能重复执行。
锁不是为了“看起来安全”,而是为了保护这些不变量不被并发破坏。
3. 避免全局大锁
全局一把锁最简单,但可能限制吞吐。
可以使用 per-key lock:
import threading
from collections import defaultdict
cache = {}
locks = defaultdict(threading.Lock)
def get_or_compute(key):
with locks[key]:
if key in cache:
return cache[key]
value = compute(key)
cache[key] = value
return value
这样不同 key 之间可以并发执行。
但要注意:defaultdict(threading.Lock) 自身在极端并发下也可能需要保护,生产环境可以用更严谨的锁管理器。
4. async 中使用 per-key lock
异步版本:
import asyncio
from collections import defaultdict
cache = {}
locks = defaultdict(asyncio.Lock)
async def get_or_fetch(key):
async with locks[key]:
if key in cache:
return cache[key]
value = await fetch_value(key)
cache[key] = value
return value
这适用于纯 async 场景。
但如果线程池也要访问同一个 cache,这个方案仍然不够,因为 asyncio.Lock 不保护线程。
十二、threading.Lock 与 asyncio.Lock 到底能不能混用?
答案要分层说。
不能把它们当成同一把锁混用
不可以这样理解:
我 async 代码用 asyncio.Lock。
线程代码用 threading.Lock。
它们锁的是同一个 cache,所以应该安全。
这是错误的。
如果两个锁不是同一套互斥机制,就可能出现:
协程拿到了 asyncio.Lock。
线程拿到了 threading.Lock。
二者同时修改同一个 cache。
这等于没有真正互斥。
可以在系统中同时存在,但必须边界清晰
一个服务里当然可以同时使用两种锁:
asyncio.Lock 保护 async 内部状态。
threading.Lock 保护线程共享状态。
但同一份共享数据,最好只归属于一种并发模型。
如果一份数据必须跨线程和协程共享,通常优先选择:
用 threading.Lock 作为底层保护;
async 侧通过 to_thread 或专用适配层访问;
或者彻底改为消息传递 / 外部缓存。
十三、一个推荐模板:混合场景下的缓存访问层
下面是一个更接近生产代码的模板。
import asyncio
import threading
import time
from typing import Callable, TypeVar, Generic
T = TypeVar("T")
class SafeCache(Generic[T]):
def __init__(self, ttl_seconds: int = 300):
self._ttl = ttl_seconds
self._data: dict[str, tuple[float, T]] = {}
self._lock = threading.RLock()
def get(self, key: str) -> T | None:
now = time.time()
with self._lock:
item = self._data.get(key)
if item is None:
return None
expire_at, value = item
if expire_at < now:
self._data.pop(key, None)
return None
return value
def set(self, key: str, value: T) -> None:
expire_at = time.time() + self._ttl
with self._lock:
self._data[key] = (expire_at, value)
def get_or_compute(self, key: str, factory: Callable[[], T]) -> T:
value = self.get(key)
if value is not None:
return value
new_value = factory()
self.set(key, new_value)
return new_value
class AsyncCacheAdapter(Generic[T]):
def __init__(self, cache: SafeCache[T]):
self._cache = cache
async def get(self, key: str) -> T | None:
return await asyncio.to_thread(self._cache.get, key)
async def set(self, key: str, value: T) -> None:
await asyncio.to_thread(self._cache.set, key, value)
async def get_or_compute(self, key: str, factory: Callable[[], T]) -> T:
return await asyncio.to_thread(self._cache.get_or_compute, key, factory)
使用方式:
cache = SafeCache[bytes](ttl_seconds=600)
async_cache = AsyncCacheAdapter(cache)
def sync_worker(image_id: str) -> bytes:
return cache.get_or_compute(
image_id,
lambda: process_image_sync(image_id)
)
async def async_handler(image_id: str) -> bytes:
cached = await async_cache.get(image_id)
if cached is not None:
return cached
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, sync_worker, image_id)
这个设计的关键是:
真正的数据结构只由 SafeCache 管。
SafeCache 只使用 threading.RLock。
async 层不直接碰锁,只通过适配器访问。
这比“到处加锁”更清晰。
十四、调试与测试:不要相信感觉,要制造竞争
并发 bug 最可怕的地方是:它们不一定每次复现。
你可以写压力测试:
from concurrent.futures import ThreadPoolExecutor
import asyncio
import random
cache = SafeCache[int]()
def sync_task(i: int):
key = f"k-{random.randint(1, 10)}"
cache.set(key, i)
return cache.get(key)
async def async_task(i: int):
key = f"k-{random.randint(1, 10)}"
await asyncio.to_thread(cache.set, key, i)
return await asyncio.to_thread(cache.get, key)
async def main():
loop = asyncio.get_running_loop()
with ThreadPoolExecutor(max_workers=8) as pool:
thread_jobs = [
loop.run_in_executor(pool, sync_task, i)
for i in range(1000)
]
async_jobs = [
async_task(i)
for i in range(1000)
]
results = await asyncio.gather(*thread_jobs, *async_jobs)
print(len(results))
asyncio.run(main())
同时观察:
是否出现 KeyError
是否出现 RuntimeError
是否有死锁
是否事件循环卡顿
CPU 是否异常升高
P95 / P99 延迟是否恶化
优秀工程师不会只说“我觉得这样安全”,而是会用测试和监控证明它安全。
十五、给初学者的记忆口诀
可以记住这几句话:
线程锁管线程,协程锁管协程。
asyncio.Lock 不保护线程池。
threading.Lock 可能阻塞事件循环。
不要在 threading.Lock 里 await。
同一份共享状态,最好只有一个所有者。
不确定时,先画出数据流,再决定锁。
这几句话比死背 API 更重要。
十六、前沿视角:未来 Python 并发会更重要
Python 生态正在持续向高性能和多场景并发演进。FastAPI、asyncio、AnyIO、Trio、Pandas、NumPy、PyTorch、任务队列、GPU 计算、边缘设备和 AI 服务,都让开发者越来越频繁地面对混合并发模型。
但趋势越热,越要回到基本功。
真正的 Python 最佳实践不是“所有项目都 async”,也不是“所有地方都加锁”,而是:
理解瓶颈。
明确资源归属。
减少共享状态。
缩小锁范围。
用测试验证假设。
在复杂系统中,技术判断本身就是一种工程温度。你写下的每一把锁,都是未来维护者要理解的一段承诺。
十七、总结:优秀工程师要能把“安全边界”讲清楚
回到最初的问题:
线程安全与协程安全有什么差异?
threading.Lock与asyncio.Lock能混用吗?
最终答案是:
线程安全解决多线程抢占式并发下的共享数据问题。
协程安全解决同一事件循环中多个协程在 await 切换下的共享数据问题。
threading.Lock 和 asyncio.Lock 可以在同一系统中同时存在,但不能随意混用来保护同一份共享状态。
对于“全局缓存同时被线程池和异步上下文访问”这个场景,我更推荐:
方案一:用线程安全缓存作为底层唯一真相,async 层通过 to_thread 访问。
方案二:缓存归 async 事件循环所有,线程通过安全通道提交请求。
方案三:把缓存外置为 Redis / Memcached / 独立缓存服务。
不要让一个全局 dict 成为系统里最脆弱、最隐蔽、最难排查的地方。
Python 编程的高级感,不在于你用了多少新语法,而在于你能不能在复杂场景中写出清晰、可靠、可维护的代码。
互动问题
你在项目中是否遇到过“明明加了锁,还是出现并发 bug”的情况?
你更倾向于在混合并发场景中使用本地缓存,还是直接引入 Redis 这类外部缓存?
欢迎在评论区分享你的经验。很多时候,工程能力就是在这些真实问题里一点点长出来的。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)