基于 FastAPI 与 `redis.asyncio` 的实现分布式锁
一、 跨节点并发的破局:分布式锁的核心诉求
上一章节已明确指出:单机本地锁(asyncio.Lock)的底层本质是进程内内存标志位。一旦系统引入 Nginx 负载均衡横向扩展至多节点,锁监视器物理隔离,一人一单规则将瞬间崩溃。
要彻底阻断分布式环境下的并发竞态,必须将锁的管辖权从应用内存迁移至外部共享状态存储。所有节点必须对同一把“全局锁”进行竞争,才能确保全局串行化执行。
分布式锁的四大核心诉求
| 诉求 | 工程含义 | 缺失后果 |
|---|---|---|
| 互斥性 | 同一时刻仅允许一个客户端持有锁 | 多节点并行执行,业务规则失效 |
| 多进程可见 | 锁状态对集群所有节点透明 | 本地锁无法跨网络同步,各自为政 |
| 高可用与高性能 | 获取/释放锁延迟极低,支持集群容灾 | 数据库行锁替代方案,吞吐量断崖下跌 |
| 安全性 | 锁能自动释放(防死锁),不误删他人锁 | 节点宕机导致永久死锁,或误释放引发并发穿透 |
二、 初级实现:基于 Redis String 结构的分布式锁
Redis 的 SET 命令原生支持 NX(Not Exists,键不存在时设置)与 EX(Expire,设置过期时间)参数。利用这两个特性,可快速构建最基础的分布式锁。
核心逻辑
- 获取锁:
SET lock_key 1 NX EX timeout。若键不存在则写入并返回OK,实现互斥;同时设置过期时间,防止服务宕机导致死锁。 - 释放锁:
DEL lock_key。业务执行完毕后直接删除键,释放互斥权。
import redis.asyncio as aioredis
# 初始化异步 Redis 客户端
redis_client = aioredis.Redis(host="127.0.0.1", port=6379, db=0, decode_responses=True)
async def acquire_lock_basic(lock_key: str, timeout: int = 10) -> bool:
"""获取分布式锁"""
# SET key value NX EX timeout
result = await redis_client.set(lock_key, "1", nx=True, ex=timeout)
return result is not None # True 表示获取成功,False 表示锁已被占用
async def release_lock_basic(lock_key: str):
"""释放分布式锁"""
await redis_client.delete(lock_key)
基础时序推演
三、 缺陷分析:误删危机与改进演进
初级实现虽能实现基础互斥,但在生产高并发与异常场景下,暴露出致命的安全性缺陷。
3.1 锁误删问题(安全性崩塌)
场景还原
节点1获取锁后,因下游依赖响应缓慢或执行复杂聚合查询,导致业务耗时 12 秒。而获取锁时设置的 TTL 仅为 10 秒。
原因分析
获取锁的客户端与锁的生命周期完全解耦。初级版释放锁时不校验持有者身份,盲目执行 DEL。当业务耗时 > TTL 时,锁已被 Redis 自动清理并分配给新客户端,原客户端的 DEL 实际上删除的是他人的锁,互斥机制形同虚设。
3.2 改进一:引入线程标识防误删
核心思路
获取锁时存入唯一线程标识(如 UUID),释放锁前先 GET 锁的值,判断是否与当前标识一致。一致则删除,不一致则放弃。
import uuid
async def acquire_lock_v2(lock_key: str, timeout: int = 10) -> str | None:
request_id = str(uuid.uuid4())
# 将 UUID 作为 Value 存入
result = await redis_client.set(lock_key, request_id, nx=True, ex=timeout)
return request_id if result else None
async def release_lock_v2(lock_key: str, request_id: str):
current_value = await redis_client.get(lock_key)
if current_value == request_id:
await redis_client.delete(lock_key)
效果评估
该改进有效阻断了“无脑释放他人锁”的常规误删场景,在业务耗时可控、网络稳定的情况下,能保障大多数场景的安全性。
3.3 极端条件:非原子操作的隐患
尽管引入了标识校验,但在极端网络抖动、GC 停顿或协程调度延迟下,GET 与 DEL 两步操作仍存在时间窗口,无法保证绝对安全。
深度分析
- Check-Then-Act 陷阱:
GET(检查)与DEL(执行)分离。在分布式网络环境中,任何微秒级的延迟都可能导致判断时的状态与执行时的状态不一致。 - TTL 自动释放的并发穿插:Redis 的惰性删除或定期扫描可能在
GET之后、DEL之前触发,导致锁状态瞬间翻转。 - 结论:多步 Redis 命令必须合并为单条原子指令执行,否则任何外部干扰都会破坏锁的绝对安全性。
四、 原子性保障:Lua 脚本与防误删机制
Redis 提供 Lua 脚本引擎,确保脚本内多条命令在执行时具备原子性(不会被其他客户端命令插入打断)。结合脚本,可彻底解决非原子操作导致的误删危机。
4.1 Lua 脚本语法与 Redis 集成
Lua 脚本通过 redis.call() 执行 Redis 命令,通过 KEYS 和 ARGV 接收参数。Redis 采用单线程事件循环模型,执行 Lua 脚本期间会阻塞其他命令,保证脚本整体串行执行。
-- 释放锁的 Lua 脚本逻辑
local lock_key = KEYS[1]
local request_id = ARGV[1]
-- 1. 获取当前锁的持有者标识
local current_id = redis.call('GET', lock_key)
-- 2. 判断是否为自己持有
if current_id == request_id then
-- 3. 一致则删除锁(原子操作)
return redis.call('DEL', lock_key)
else
-- 4. 不一致则不操作,返回 0
return 0
end
4.2 redis.asyncio 调用 Lua 脚本
UNLOCK_LUA = """
local current_id = redis.call('GET', KEYS[1])
if current_id == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
"""
async def release_lock_with_lua(lock_key: str, request_id: str):
"""通过 Lua 脚本原子释放锁"""
await redis_client.eval(UNLOCK_LUA, 1, lock_key, request_id)
原子性保障解析EVAL 命令将脚本整体提交至 Redis 服务端。Redis 保证脚本内所有命令在单线程内连续执行,彻底消除 GET 与 DEL 之间的竞态窗口。极端条件下的误删问题迎刃而解,分布式锁的安全性得到生产级保障。
五、 基于 redis.asyncio 攻克分布式锁四大缺陷
基础 SET NX EX + Lua 释放虽能保障基础互斥与防误删,但在真实企业级秒杀场景中,仍面临四大核心架构缺陷。redis.asyncio 作为官方生产级异步客户端,并未内置高级锁对象,但提供了完整的底层通信原语与原子命令接口。开发者可通过组合异步调度、Hash 结构与 Lua 脚本,精准复刻工业级分布式锁的核心机制。
缺陷一:不可重入(锁的自我死锁)
问题描述
同一协程(或同一工作线程)若递归调用或多次尝试获取同一把锁,会因 NX(键已存在)直接失败,导致自身业务阻塞,形成死锁。
架构实现思路:基于 Hash 结构的重入计数
摒弃 String 结构,改用 Redis Hash。以 lock_key 为外层,以 request_id 为 field,以 重入次数 为 value。利用 redis.asyncio 的 eval 方法保证 Hash 操作的原子性。
- 首次获取:
HSETNX设置 field 为 1,并EXPIRE设置 TTL。 - 重入获取:
HEXISTS判断 field 存在 →HINCRBY计数器 +1 →EXPIRE刷新 TTL。 - 释放锁:
HINCRBY计数器 -1。若 >0,仅刷新 TTL;若 =0,DEL彻底删除锁。
# 重入获取锁 Lua 脚本核心逻辑(依赖 redis.asyncio.eval 原子执行)
ACQUIRE_REENTRANT_LUA = """
local key = KEYS[1]
local req_id = ARGV[1]
local ttl = ARGV[2]
if redis.call('EXISTS', key) == 0 then
redis.call('HSET', key, req_id, '1')
redis.call('EXPIRE', key, ttl)
return 1
end
if redis.call('HEXISTS', key, req_id) == 1 then
redis.call('HINCRBY', key, req_id, 1)
redis.call('EXPIRE', key, ttl)
return 1
end
return 0
"""
缺陷二:不可重试(流量瞬间丢弃)
问题描述
基础锁尝试一次失败直接返回 False。业务层需自行编写轮询逻辑,若采用固定间隔高频重试,极易引发惊群效应,打满 Redis 与数据库。
架构实现思路:异步指数退避与信号唤醒
结合 asyncio.sleep 实现指数退避重试,并利用 redis.asyncio 的 Pub/Sub 订阅能力监听锁释放事件,避免无效空转。
- 主流程:获取失败 →
await asyncio.sleep(base_delay * 2 ** retry_count)→ 重试。 - 唤醒优化:锁释放时通过
PUBLISH发送信号。等待协程通过SUBSCRIBE监听,收到信号后优先重试,大幅降低延迟与资源消耗。
async def try_lock_with_backoff(lock_key: str, ttl: int = 10, max_retries: int = 5):
request_id = str(uuid.uuid4())
for i in range(max_retries):
# 依赖 redis.asyncio.eval 执行重入 Lua 脚本
ok = await redis_client.eval(ACQUIRE_REENTRANT_LUA, 1, lock_key, request_id, str(ttl))
if ok == 1:
return request_id
# 指数退避:0.05s -> 0.1s -> 0.2s -> 0.4s
await asyncio.sleep(0.05 * (2 ** i))
return None
缺陷三:超时释放(业务耗时不可控引发并发穿透)
问题描述
固定 TTL 无法适应动态业务耗时。若秒杀链路因网络波动、下游依赖降级导致执行时间远超 TTL,锁会被 Redis 自动删除,后续请求涌入,引发并发穿透。
架构实现思路:独立协程动态续期
借鉴后台守护线程思想,利用 asyncio.create_task 启动独立协程。每隔 TTL / 3 时间通过 redis.asyncio.hexists 检测锁是否仍被当前 request_id 持有。若是,执行 EXPIRE 续期;若业务已释放或标识不匹配,则安全终止。
async def start_watchdog(lock_key: str, request_id: str, ttl: int = 10):
async def _renew_loop():
while True:
await asyncio.sleep(ttl / 3)
# 依赖 redis.asyncio.hexists 检查锁持有状态
is_held = await redis_client.hexists(lock_key, request_id)
if is_held:
await redis_client.expire(lock_key, ttl) # 动态续期
else:
break # 锁已释放或不属于自己,终止续期
return asyncio.create_task(_renew_loop())
缺陷四:主从一致性(Redis 异步复制导致的多头获取)
问题描述
Redis 主从架构采用异步复制。主节点获取锁后,若在主从同步完成前宕机,从节点晋升为主,锁数据未同步。此时多个客户端可能同时在新的主节点上获取到同一把锁,破坏互斥性。
架构实现思路:主从一致性的工程权衡redis.asyncio 客户端本身不感知主从异步复制延迟,需由架构层制定容灾策略:
- AP 模式(业务容忍):生产环境通常接受短暂的不一致。通过较短 TTL + 业务层幂等设计(如数据库唯一订单索引)兜底。即使并发穿透,数据库层通过唯一约束拦截重复写入,保障最终一致。
- CP 模式(Redlock 算法):向 N 个完全独立的 Redis 实例并发请求锁。仅当在
N/2 + 1个节点获取成功,且总耗时< TTL - 容错窗口时,视为锁成功。redis.asyncio可通过asyncio.gather()并发请求多个独立连接池实现,但会牺牲部分延迟与运维复杂度,需按资损容忍度决策。
六、 完整落地:分布式锁双方案实现与工程选型
基于前述可重入、防误删、动态续期与智能重试机制,结合 FastAPI 与 SQLAlchemy 异步架构,提供两种分布式锁实现路径。两者在底层数据结构、资源开销与适用边界上存在明确差异,需依据业务特征进行技术选型。
方案一:官方内置 redis.asyncio.lock.Lock
适用边界:业务执行耗时固定且较短(<5 秒)、无嵌套调用需求、允许固定间隔重试、非核心交易链路。
import asyncio
import logging
from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
import redis.asyncio as aioredis
from common.database import get_db
from module_seckill.model import SeckillVoucher, VoucherOrder
logger = logging.getLogger(__name__)
app = FastAPI()
redis_client = aioredis.Redis(host="127.0.0.1", port=6379, db=0, decode_responses=True)
@app.post("/voucher-order/seckill/{voucher_id}")
async def seckill_voucher_simple(voucher_id: int, user_id: int, db: AsyncSession = Depends(get_db)):
# 1. 创建内置锁实例(自动管理 token、原子获取/释放)
lock = redis_client.lock(
f"order:{user_id}:{voucher_id}",
timeout=10, # 锁自动释放时间
blocking_timeout=2 # 获取锁的最大等待时间
)
# 2. 尝试获取锁
if not await lock.acquire():
raise HTTPException(status_code=429, detail="System busy, please try later")
try:
# 3. 一人一单校验(此时全局串行)
result = await db.execute(
select(func.count(VoucherOrder.id)).where(
VoucherOrder.user_id == user_id, VoucherOrder.voucher_id == voucher_id
)
)
if result.scalar() > 0:
raise HTTPException(status_code=400, detail="User has already purchased")
# 4. 扣减库存 & 创建订单
voucher = await db.get(SeckillVoucher, voucher_id)
if not voucher or voucher.stock <= 0:
raise HTTPException(status_code=400, detail="Out of stock")
voucher.stock -= 1
db.add(voucher)
order = VoucherOrder(user_id=user_id, voucher_id=voucher_id)
db.add(order)
await db.commit()
return {"order_id": order.id, "status": "success"}
except Exception as e:
await db.rollback()
logger.error(f"Seckill failed: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
finally:
# 5. 确保锁释放(内置锁自动校验 token,防误删)
await lock.release()
特性
- 原子性保障:底层封装
SET key token NX PX timeout单条命令,规避SETNX与EXPIRE分离的竞态窗口。 - 防误删机制:释放锁时内置 Lua 脚本校验 Token,确保仅持有者可执行
DEL操作。 - 阻塞重试:通过
blocking_timeout参数控制等待周期,内部采用固定间隔轮询。
能力边界
- 不支持可重入:基于 String 结构实现,同一协程二次尝试获取将因
NX约束直接失败,可能引发协程自阻塞。 - 固定生命周期:依赖初始化时传入的
timeout,缺乏后台续期机制。业务耗时超出预设阈值将触发自动释放,存在并发穿透风险。 - 重试策略单一:未内置指数退避或信号量唤醒,高并发场景下易产生无效轮询,增加 Redis 节点负载。
方案二:生产级自定义 DistributedLock
适用边界:业务链路长、可能重入、需动态续期、高并发重试、强一致性兜底。
import asyncio
import uuid
import logging
from typing import Optional
from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from datetime import datetime
import redis.asyncio as aioredis
from common.database import get_db
from module_seckill.model import SeckillVoucher, VoucherOrder
logger = logging.getLogger(__name__)
app = FastAPI()
redis_client = aioredis.Redis(host="127.0.0.1", port=6379, db=0, decode_responses=True)
# Lua 脚本常量(获取锁 & 释放锁)
ACQUIRE_LOCK_LUA = """
local key = KEYS[1]
local req_id = ARGV[1]
local ttl = ARGV[2]
if redis.call('EXISTS', key) == 0 then
redis.call('HSET', key, req_id, '1')
redis.call('EXPIRE', key, ttl)
return 1
end
if redis.call('HEXISTS', key, req_id) == 1 then
redis.call('HINCRBY', key, req_id, 1)
redis.call('EXPIRE', key, ttl)
return 1
end
return 0
"""
RELEASE_LOCK_LUA = """
local key = KEYS[1]
local req_id = ARGV[1]
if redis.call('HEXISTS', key, req_id) == 0 then
return nil
end
local count = redis.call('HINCRBY', key, req_id, -1)
if count > 0 then
redis.call('EXPIRE', key, ARGV[2])
return nil
else
redis.call('DEL', key)
return 1
end
"""
class DistributedLock:
def __init__(self, redis: aioredis.Redis, key: str, ttl: int = 10):
self.redis = redis
self.key = f"lock:{key}"
self.ttl = ttl
self.request_id = str(uuid.uuid4())
self.watchdog_task = None
async def acquire(self, retry: int = 3) -> bool:
for i in range(retry):
ok = await self.redis.eval(ACQUIRE_LOCK_LUA, 1, self.key, self.request_id, str(self.ttl))
if ok == 1:
# 获取成功,启动看门狗
self.watchdog_task = asyncio.create_task(self._watchdog())
return True
await asyncio.sleep(0.1 * (2 ** i)) # 指数退避重试
return False
async def release(self):
if self.watchdog_task:
self.watchdog_task.cancel()
await self.redis.eval(RELEASE_LOCK_LUA, 1, self.key, self.request_id, str(self.ttl))
async def _watchdog(self):
while True:
await asyncio.sleep(self.ttl / 3)
held = await self.redis.hexists(self.key, self.request_id)
if held:
await self.redis.expire(self.key, self.ttl) # 动态续期
else:
break
@app.post("/voucher-order/seckill/{voucher_id}")
async def seckill_voucher_distributed(voucher_id: int, user_id: int, db: AsyncSession = Depends(get_db)):
lock_key = f"order:{user_id}:{voucher_id}"
lock = DistributedLock(redis_client, lock_key, ttl=10)
# 1. 尝试获取分布式锁
if not await lock.acquire(retry=2):
raise HTTPException(status_code=429, detail="System busy, please try later")
try:
# 2. 一人一单校验(此时全局串行,无并发竞态)
result = await db.execute(
select(func.count(VoucherOrder.id)).where(
VoucherOrder.user_id == user_id, VoucherOrder.voucher_id == voucher_id
)
)
if result.scalar() > 0:
raise HTTPException(status_code=400, detail="User has already purchased")
# 3. 扣减库存 & 创建订单
voucher = await db.get(SeckillVoucher, voucher_id)
if not voucher or voucher.stock <= 0:
raise HTTPException(status_code=400, detail="Out of stock")
voucher.stock -= 1
db.add(voucher)
order = VoucherOrder(user_id=user_id, voucher_id=voucher_id)
db.add(order)
await db.commit()
return {"order_id": order.id, "status": "success"}
except Exception as e:
await db.rollback()
logger.error(f"Seckill failed: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
finally:
# 4. 确保锁释放
await lock.release()
架构特性
- 可重入设计:采用 Hash 结构记录
request_id与计数器,支持同一协程多次安全获取与计数器自增。 - 动态续期(WatchDog):通过
asyncio.create_task启动独立协程,按TTL / 3周期检测锁持有状态并执行EXPIRE刷新,规避长链路业务导致的锁提前释放。 - 智能重试:获取失败时执行指数退避(
0.1s -> 0.2s -> 0.4s),有效抑制惊群效应。 - 原子释放:释放脚本通过
HEXISTS校验归属,执行HINCRBY递减计数,仅当计数归零时执行DEL彻底清理。
成本
- 维护复杂度:需独立管理 Lua 脚本生命周期、后台协程调度与异常捕获逻辑。
- 排查门槛:分布式锁竞态、续期异常或协程泄漏需依赖全链路日志、Redis 慢查询监控与协程 Profiling 工具联合定位。
架构演进结论
内置 Lock 提供分布式锁的基础互斥能力,适用于短链路、低并发的常规场景。自定义 DistributedLock 通过组合 Hash 结构、Lua 原子脚本与异步协程调度,补齐可重入、动态续期与智能重试能力,适用于长链路、高并发的核心交易场景。在后续 Redis 前置库存拦截与异步消息队列架构中,该自定义锁类将作为底层并发控制组件直接复用,支撑万级 QPS 下的串行化拦截与数据一致性保障。
七、总结
至此,基于 redis.asyncio 的分布式锁已完美解决单机与集群模式下的一人一单并发安全问题。然而,将数据库校验、库存扣减、订单落库全部包裹在分布式锁内,仍存在不可忽视的架构瓶颈:
- 锁粒度与吞吐量矛盾:分布式锁将并发请求强制串行化。万级 QPS 下,数据库连接池与行级更新仍会成为绝对瓶颈,大量请求在锁外排队,响应延迟呈指数级上升。
- 同步链路阻塞:用户请求需等待锁获取、DB 查询、事务提交全流程完成。任何一环网络抖动,都将直接拖垮接口 RT(Response Time)。
- Redis 单点写入压力:高频锁竞争导致 Redis 内存操作密集,虽快于 DB,但仍需承担全量请求的协调开销。
秒杀系统的终极形态绝非“用分布式锁包裹所有同步逻辑”,而是前置拦截 + 异步削峰 + 最终一致。如何彻底解放数据库?如何将同步阻塞转化为异步流式处理?我们下一章实现。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)