线程安全 ≠ 协程安全:当全局缓存同时遇上线程池和 async,优秀 Python 工程师该如何设计?

Python 让很多人第一次感受到编程的温柔:语法简洁,生态丰富,既能写 Web 服务,也能做数据分析、自动化脚本、机器学习和 AI 应用。但越是“简单好用”的语言,越容易在工程深水区暴露细节。

比如今天这个问题:

一个全局缓存,既会被线程池中的同步任务访问,又会被 asyncio 异步上下文访问。
线程安全和协程安全有什么区别?
threading.Lockasyncio.Lock 能混用吗?

这是一个非常典型的 Python 实战问题。它不只是锁的选择问题,而是并发模型、资源边界和工程责任的问题。


一、先给结论:不能简单混用

threading.Lockasyncio.Lock 不能当成同一种锁混用

更准确地说:

threading.Lock 保护的是线程之间的共享资源。
asyncio.Lock 保护的是同一个事件循环内协程之间的共享资源。

它们面对的调度模型不同:

线程:由操作系统抢占式调度,可能在任意时刻被切换。
协程:由事件循环协作式调度,通常在 await 处让出执行权。

因此,在线程池和异步上下文同时访问全局缓存时,最危险的做法是:

# 错误倾向:以为用了 asyncio.Lock 就能保护线程池里的访问
cache = {}
async_lock = asyncio.Lock()

或者:

# 错误倾向:在 async 函数中直接用 threading.Lock 包住 await
thread_lock = threading.Lock()

async def bad():
    with thread_lock:
        await some_io()

前者保护不了线程池里的同步代码,后者可能阻塞事件循环,甚至造成死锁和吞吐下降。


二、线程安全:面对“抢占式切换”的安全

线程安全关注的是:多个线程同时访问同一份可变数据时,程序是否仍然正确。

例如一个全局缓存:

cache = {}

def get_user(user_id):
    if user_id not in cache:
        cache[user_id] = load_user_from_db(user_id)
    return cache[user_id]

这段代码看似简单,但在多线程下有竞态条件。

两个线程可能同时发现 user_id not in cache,然后同时去数据库加载,再同时写入缓存。轻则重复请求,重则写入脏数据。

正确做法之一是使用 threading.Lock

import threading

cache = {}
cache_lock = threading.Lock()

def get_user(user_id):
    with cache_lock:
        if user_id not in cache:
            cache[user_id] = load_user_from_db(user_id)
        return cache[user_id]

但这里还有一个性能问题:如果 load_user_from_db 很慢,那么锁会被长时间持有,其他线程都被挡住。

更好的做法是区分“检查缓存”和“加载数据”的边界:

import threading

cache = {}
cache_lock = threading.Lock()

def get_user(user_id):
    with cache_lock:
        user = cache.get(user_id)

    if user is not None:
        return user

    user = load_user_from_db(user_id)

    with cache_lock:
        existing = cache.get(user_id)
        if existing is not None:
            return existing

        cache[user_id] = user
        return user

这不是完美的单飞请求合并,但比长时间持锁更健康。它体现了一个重要原则:

锁应该尽量保护共享状态,而不是保护整个业务流程。


三、协程安全:面对“await 让出执行权”的安全

协程安全关注的是:多个协程在同一个事件循环中交替运行时,共享状态是否仍然正确。

很多初学者以为:

async 是单线程,所以不会有并发问题。

这也是误区。

看这个例子:

counter = 0

async def increase():
    global counter

    value = counter
    await asyncio.sleep(0)
    counter = value + 1

如果同时运行多个协程:

import asyncio

counter = 0

async def increase():
    global counter
    value = counter
    await asyncio.sleep(0)
    counter = value + 1

async def main():
    await asyncio.gather(*(increase() for _ in range(1000)))
    print(counter)

asyncio.run(main())

你可能期待输出 1000,但结果可能远小于 1000。

原因是:协程虽然不是被操作系统随时打断,但它会在 await 处主动让出控制权。多个协程可能读取到同一个旧值,然后覆盖彼此的更新。

此时应该使用 asyncio.Lock

import asyncio

counter = 0
lock = asyncio.Lock()

async def increase():
    global counter

    async with lock:
        value = counter
        await asyncio.sleep(0)
        counter = value + 1

不过这里也有一个实践提醒:不要在锁内做太多耗时 I/O。虽然 asyncio.Lock 不会阻塞整个线程,但它会让其他等待同一把锁的协程排队。


四、线程安全和协程安全的核心差异

可以用一张表概括。

对比项 线程安全 协程安全
调度方式 操作系统抢占式调度 事件循环协作式调度
切换时机 理论上可能发生在很多地方 通常发生在 await
常用锁 threading.LockRLock asyncio.Lock
是否阻塞线程 会阻塞当前线程 不阻塞事件循环线程,但会挂起协程
保护范围 多线程共享数据 同一事件循环内的协程共享数据
是否跨线程有效 否,不应用于线程池同步代码
是否可在 async 中随便用 不建议 是 async 场景首选

一句话:

threading.Lock 是给线程看的。
asyncio.Lock 是给协程看的。

它们不是替代关系,而是服务于不同并发世界的工具。


五、错误示例:在 async 中直接使用 threading.Lock

假设你写了这样的代码:

import threading
import asyncio

cache = {}
lock = threading.Lock()

async def get_value(key):
    with lock:
        if key in cache:
            return cache[key]

        value = await fetch_from_remote(key)
        cache[key] = value
        return value

这段代码有两个大问题。

第一,threading.Lock 的获取是阻塞式的。如果另一个线程持有这把锁,当前事件循环线程会被卡住,整个 async 服务都可能变慢。

第二,更严重的是,你在持有 threading.Lock 的时候执行了 await。这意味着当前协程让出了执行权,但锁还没释放。其他线程或代码如果也在等待这把锁,就可能形成非常难排查的阻塞链。

正确原则是:

不要在 threading.Lock 保护区内 await。
不要用阻塞锁包裹异步 I/O。

六、错误示例:用 asyncio.Lock 保护线程池共享数据

再看另一个错误方向:

import asyncio
from concurrent.futures import ThreadPoolExecutor

cache = {}
lock = asyncio.Lock()

def worker(key):
    # 线程池里的同步函数无法 async with lock
    if key not in cache:
        cache[key] = compute(key)
    return cache[key]

asyncio.Lock 必须通过 async withawait 使用,它属于事件循环机制。线程池里的同步函数无法直接等待它。

即使你想强行绕,也会让代码变得脆弱、复杂,而且容易出现事件循环绑定、跨线程调度和死锁问题。

正确原则是:

asyncio.Lock 不能保护线程池里的普通同步函数。
线程池访问共享数据时,需要线程锁或更清晰的资源所有权模型。

七、真实场景:全局缓存同时被线程池和 async 访问

假设我们有一个图片处理服务。

API 层是异步的:

@app.get("/image/{image_id}")
async def get_image(image_id: str):
    ...

但图像处理是 CPU 密集型,会丢到线程池或进程池中:

result = await loop.run_in_executor(pool, process_image, image_id)

同时,API 层和 worker 都要访问一个全局缓存:

cache = {
    "image:001": b"..."
}

这时如果随便在 async 代码里用 asyncio.Lock,线程池不受保护;如果全局都用 threading.Lock,async API 又可能被阻塞。

怎么办?

推荐三种方案。


八、方案一:统一用线程安全缓存,async 层通过 to_thread 访问

如果缓存必须被线程池和 async 层共同访问,可以把缓存设计成一个纯同步、线程安全的组件。

import threading
import time
from typing import Any

class ThreadSafeTTLCache:
    def __init__(self, ttl_seconds: int = 300):
        self._data: dict[str, tuple[float, Any]] = {}
        self._ttl = ttl_seconds
        self._lock = threading.RLock()

    def get(self, key: str) -> Any | None:
        now = time.time()

        with self._lock:
            item = self._data.get(key)
            if item is None:
                return None

            expire_at, value = item
            if expire_at < now:
                del self._data[key]
                return None

            return value

    def set(self, key: str, value: Any) -> None:
        expire_at = time.time() + self._ttl

        with self._lock:
            self._data[key] = (expire_at, value)

    def delete(self, key: str) -> None:
        with self._lock:
            self._data.pop(key, None)

线程池中可以直接使用:

cache = ThreadSafeTTLCache()

def process_image(image_id: str):
    cached = cache.get(image_id)
    if cached is not None:
        return cached

    result = heavy_compute(image_id)
    cache.set(image_id, result)
    return result

异步上下文中不要直接调用可能阻塞的锁竞争逻辑,可以通过 asyncio.to_thread 转到线程中执行:

import asyncio

async def async_get_cache(key: str):
    return await asyncio.to_thread(cache.get, key)

async def async_set_cache(key: str, value):
    await asyncio.to_thread(cache.set, key, value)

API 层:

async def get_image_result(image_id: str):
    cached = await async_get_cache(image_id)
    if cached is not None:
        return cached

    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(None, process_image, image_id)

    return result

这个方案的好处是:共享状态只有一种保护规则:线程锁。

缺点是:async 访问缓存时有线程切换开销。但对于复杂服务来说,这个开销通常比锁模型混乱更可控。


九、方案二:缓存只属于事件循环,线程通过安全通道访问

另一种思路是:明确缓存归属权。

比如规定:

缓存只允许在 async 事件循环中被直接访问。
线程池不能直接读写缓存。
线程池要访问缓存,必须通过事件循环提交请求。

可以使用 asyncio.run_coroutine_threadsafe

import asyncio
from typing import Any

class AsyncCache:
    def __init__(self):
        self._data: dict[str, Any] = {}
        self._lock = asyncio.Lock()

    async def get(self, key: str):
        async with self._lock:
            return self._data.get(key)

    async def set(self, key: str, value: Any):
        async with self._lock:
            self._data[key] = value

在线程中访问时:

def worker_get_cache(loop, cache: AsyncCache, key: str):
    future = asyncio.run_coroutine_threadsafe(cache.get(key), loop)
    return future.result()

这个方案更适合缓存强依赖 async 生命周期的场景。

但它也有风险:线程会等待事件循环返回结果。如果事件循环又在等待线程池结果,就可能形成循环等待。因此必须谨慎设计调用方向。


十、方案三:把缓存独立成服务或 Actor

在更复杂的系统里,我更推荐把共享状态变成一个“独立所有者”。

示意图:

Async API 层
    |
    | 请求缓存
    v
Cache Manager / Actor
    ^
    | 请求缓存
Thread Pool Worker

缓存不再是一个到处可访问的全局 dict,而是由一个专门组件管理。其他模块通过方法、队列、RPC 或消息传递访问它。

这种思想叫:

不要共享内存来通信。
而是通过通信来共享状态。

在 Python 项目中,可以用:

本地队列
专门的缓存管理线程
Redis
Memcached
数据库缓存表
消息队列

如果是多进程部署,进程内全局缓存本来就不可靠,因为每个进程都有自己的内存副本。此时 Redis 这类外部缓存往往更合适。


十一、最佳实践:锁要短,边界要清晰

无论使用线程锁还是协程锁,都要遵守几个原则。

1. 不要在锁里做慢操作

不推荐:

with lock:
    value = slow_network_call()
    cache[key] = value

推荐:

value = slow_network_call()

with lock:
    cache[key] = value

对于 async 也是一样。

不推荐:

async with lock:
    value = await fetch_data()
    cache[key] = value

推荐:

value = await fetch_data()

async with lock:
    cache[key] = value

当然,这可能带来重复加载问题。如果要避免重复加载,需要引入更精细的 per-key lock 或 singleflight 模式。


2. 锁保护的是不变量

比如缓存的不变量可能是:

_data 中每个 key 对应的 value 必须没有过期。
_hits 和 _misses 的统计必须和访问行为一致。
某个 key 的构建过程不能重复执行。

锁不是为了“看起来安全”,而是为了保护这些不变量不被并发破坏。


3. 避免全局大锁

全局一把锁最简单,但可能限制吞吐。

可以使用 per-key lock:

import threading
from collections import defaultdict

cache = {}
locks = defaultdict(threading.Lock)

def get_or_compute(key):
    with locks[key]:
        if key in cache:
            return cache[key]

        value = compute(key)
        cache[key] = value
        return value

这样不同 key 之间可以并发执行。

但要注意:defaultdict(threading.Lock) 自身在极端并发下也可能需要保护,生产环境可以用更严谨的锁管理器。


4. async 中使用 per-key lock

异步版本:

import asyncio
from collections import defaultdict

cache = {}
locks = defaultdict(asyncio.Lock)

async def get_or_fetch(key):
    async with locks[key]:
        if key in cache:
            return cache[key]

        value = await fetch_value(key)
        cache[key] = value
        return value

这适用于纯 async 场景。
但如果线程池也要访问同一个 cache,这个方案仍然不够,因为 asyncio.Lock 不保护线程。


十二、threading.Lockasyncio.Lock 到底能不能混用?

答案要分层说。

不能把它们当成同一把锁混用

不可以这样理解:

我 async 代码用 asyncio.Lock。
线程代码用 threading.Lock。
它们锁的是同一个 cache,所以应该安全。

这是错误的。

如果两个锁不是同一套互斥机制,就可能出现:

协程拿到了 asyncio.Lock。
线程拿到了 threading.Lock。
二者同时修改同一个 cache。

这等于没有真正互斥。

可以在系统中同时存在,但必须边界清晰

一个服务里当然可以同时使用两种锁:

asyncio.Lock 保护 async 内部状态。
threading.Lock 保护线程共享状态。

但同一份共享数据,最好只归属于一种并发模型。

如果一份数据必须跨线程和协程共享,通常优先选择:

用 threading.Lock 作为底层保护;
async 侧通过 to_thread 或专用适配层访问;
或者彻底改为消息传递 / 外部缓存。

十三、一个推荐模板:混合场景下的缓存访问层

下面是一个更接近生产代码的模板。

import asyncio
import threading
import time
from typing import Callable, TypeVar, Generic

T = TypeVar("T")

class SafeCache(Generic[T]):
    def __init__(self, ttl_seconds: int = 300):
        self._ttl = ttl_seconds
        self._data: dict[str, tuple[float, T]] = {}
        self._lock = threading.RLock()

    def get(self, key: str) -> T | None:
        now = time.time()

        with self._lock:
            item = self._data.get(key)
            if item is None:
                return None

            expire_at, value = item
            if expire_at < now:
                self._data.pop(key, None)
                return None

            return value

    def set(self, key: str, value: T) -> None:
        expire_at = time.time() + self._ttl

        with self._lock:
            self._data[key] = (expire_at, value)

    def get_or_compute(self, key: str, factory: Callable[[], T]) -> T:
        value = self.get(key)
        if value is not None:
            return value

        new_value = factory()
        self.set(key, new_value)
        return new_value

class AsyncCacheAdapter(Generic[T]):
    def __init__(self, cache: SafeCache[T]):
        self._cache = cache

    async def get(self, key: str) -> T | None:
        return await asyncio.to_thread(self._cache.get, key)

    async def set(self, key: str, value: T) -> None:
        await asyncio.to_thread(self._cache.set, key, value)

    async def get_or_compute(self, key: str, factory: Callable[[], T]) -> T:
        return await asyncio.to_thread(self._cache.get_or_compute, key, factory)

使用方式:

cache = SafeCache[bytes](ttl_seconds=600)
async_cache = AsyncCacheAdapter(cache)

def sync_worker(image_id: str) -> bytes:
    return cache.get_or_compute(
        image_id,
        lambda: process_image_sync(image_id)
    )

async def async_handler(image_id: str) -> bytes:
    cached = await async_cache.get(image_id)
    if cached is not None:
        return cached

    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(None, sync_worker, image_id)

这个设计的关键是:

真正的数据结构只由 SafeCache 管。
SafeCache 只使用 threading.RLock。
async 层不直接碰锁,只通过适配器访问。

这比“到处加锁”更清晰。


十四、调试与测试:不要相信感觉,要制造竞争

并发 bug 最可怕的地方是:它们不一定每次复现。

你可以写压力测试:

from concurrent.futures import ThreadPoolExecutor
import asyncio
import random

cache = SafeCache[int]()

def sync_task(i: int):
    key = f"k-{random.randint(1, 10)}"
    cache.set(key, i)
    return cache.get(key)

async def async_task(i: int):
    key = f"k-{random.randint(1, 10)}"
    await asyncio.to_thread(cache.set, key, i)
    return await asyncio.to_thread(cache.get, key)

async def main():
    loop = asyncio.get_running_loop()

    with ThreadPoolExecutor(max_workers=8) as pool:
        thread_jobs = [
            loop.run_in_executor(pool, sync_task, i)
            for i in range(1000)
        ]

        async_jobs = [
            async_task(i)
            for i in range(1000)
        ]

        results = await asyncio.gather(*thread_jobs, *async_jobs)
        print(len(results))

asyncio.run(main())

同时观察:

是否出现 KeyError
是否出现 RuntimeError
是否有死锁
是否事件循环卡顿
CPU 是否异常升高
P95 / P99 延迟是否恶化

优秀工程师不会只说“我觉得这样安全”,而是会用测试和监控证明它安全。


十五、给初学者的记忆口诀

可以记住这几句话:

线程锁管线程,协程锁管协程。
asyncio.Lock 不保护线程池。
threading.Lock 可能阻塞事件循环。
不要在 threading.Lock 里 await。
同一份共享状态,最好只有一个所有者。
不确定时,先画出数据流,再决定锁。

这几句话比死背 API 更重要。


十六、前沿视角:未来 Python 并发会更重要

Python 生态正在持续向高性能和多场景并发演进。FastAPI、asyncio、AnyIO、Trio、Pandas、NumPy、PyTorch、任务队列、GPU 计算、边缘设备和 AI 服务,都让开发者越来越频繁地面对混合并发模型。

但趋势越热,越要回到基本功。

真正的 Python 最佳实践不是“所有项目都 async”,也不是“所有地方都加锁”,而是:

理解瓶颈。
明确资源归属。
减少共享状态。
缩小锁范围。
用测试验证假设。

在复杂系统中,技术判断本身就是一种工程温度。你写下的每一把锁,都是未来维护者要理解的一段承诺。


十七、总结:优秀工程师要能把“安全边界”讲清楚

回到最初的问题:

线程安全与协程安全有什么差异?
threading.Lockasyncio.Lock 能混用吗?

最终答案是:

线程安全解决多线程抢占式并发下的共享数据问题。
协程安全解决同一事件循环中多个协程在 await 切换下的共享数据问题。
threading.Lock 和 asyncio.Lock 可以在同一系统中同时存在,但不能随意混用来保护同一份共享状态。

对于“全局缓存同时被线程池和异步上下文访问”这个场景,我更推荐:

方案一:用线程安全缓存作为底层唯一真相,async 层通过 to_thread 访问。
方案二:缓存归 async 事件循环所有,线程通过安全通道提交请求。
方案三:把缓存外置为 Redis / Memcached / 独立缓存服务。

不要让一个全局 dict 成为系统里最脆弱、最隐蔽、最难排查的地方。

Python 编程的高级感,不在于你用了多少新语法,而在于你能不能在复杂场景中写出清晰、可靠、可维护的代码。


互动问题

你在项目中是否遇到过“明明加了锁,还是出现并发 bug”的情况?

你更倾向于在混合并发场景中使用本地缓存,还是直接引入 Redis 这类外部缓存?

欢迎在评论区分享你的经验。很多时候,工程能力就是在这些真实问题里一点点长出来的。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐