你知道群聊里的「文件共享、审核」是怎么实现的吗?
你知道群聊里的「文件共享、审核」是怎么实现的吗?
一个实习生的 BBS 社区内容审核开发实录:从对象存储、AI 审核、到实时推送,再到消息队列的一次完整演进。
技术栈:Python · FastAPI · PostgreSQL · Azure Blob Store · Azure Content Safety · Centrifugo · Docker · Svelte
🤔 先从一个问题开始
你有没有想过,当你在群聊里发了一张图,对方为什么几乎是瞬间就能看到?
这张图去哪了?谁在审核它?它是直接从你手机传到对方手机的吗?
不是的。背后有一整套系统在悄悄工作——而我最近刚好亲手搭了一个类似的东西:一个 BBS 社区的文件上传与内容审核系统。这篇文章就是想聊聊它的完整设计思路,以及我在架构演进中踩过的那些坑。
📦 第一关:图片去哪了?
很多人以为文件是"点对点传输"的,就像蓝牙那样。
实际上不是。
你上传的图片,会先飞到某个对象存储里。你可以把它理解成一个超级大的网盘,每个文件有一个唯一的地址(Key)。我用的是 Azure Blob Store。
用户上传图片
↓
FastAPI 后端
↓
Azure Blob Store(文件永久存在这里)
↓
数据库只记录这个文件的 Key,不存图片本身
为什么数据库不直接存图片?
把图片转成 base64 存进数据库是个经典的新手陷阱——数据库会越来越重,每次查询都在搬运不必要的大体积数据。正确做法是:数据库只存文件的"地址"(Key),读取时通过 Key 生成一个有时效的访问链接(预签名 URL)给前端。
这样数据库保持轻盈,图片的读写压力全交给专门的存储服务。
🚨 第二关:谁在审核这张图?
问题来了:如果有人上传了违规内容怎么办?
总不能让人工一张一张看吧?
这里就需要 AI 内容审核。我用的是 Azure Content Safety,它可以识别图片里是否包含暴力、色情、仇恨言论等内容,返回一个风险评分。
# 调用 Azure Content Safety 审核图片
result = content_safety_client.analyze_image(
AnalyzeImageOptions(image=ImageData(content=image_bytes))
)
if result.categories_analysis[0].severity > THRESHOLD:
flag_as_violation(task_id)
但问题又来了——审核需要时间,用户上传完总不能一直盯着转圈圈等吧?
⚡ 第三关:审核结果怎么实时通知用户?
这是整个系统里最有意思的部分。
方案 A(我没选):Azure Functions
文件上传 → 触发 Azure Functions → 审核完发通知。
听起来很美,但实际上有几个问题:
- 冷启动延迟:函数半天没调用就"睡着了",下次触发要等它醒来
- 状态难管理:Serverless 是无状态的,审核流程涉及重试逻辑时很难处理
- 本地调试痛苦:本地模拟环境和云上行为经常对不上
方案 B(我选的):Worker 容器 + Centrifugo
用户上传文件
↓
FastAPI 后端把"待审核任务"写入 task_queue 表
↓
Worker 容器(持续运行的 Docker 容器)消费任务
↓
调用 Azure Content Safety 审核
↓
通过 Centrifugo 把结果实时推送给 Svelte 前端
↓
用户看到「审核通过 ✅」或「内容违规 ❌」
Centrifugo 是什么? 你可以把它理解成"服务器主动给浏览器发消息的工具"。前端通过 WebSocket 保持长连接,后端一有结果就立刻推过去,用户完全无感知地收到通知,不需要前端反复问"好了吗?好了吗?"
🗄️ 第四关:任务队列怎么做?
我没有引入 Redis 或 RabbitMQ,原因很简单:这是个早期 BBS 社区,我需要轻量化。
所以我用 PostgreSQL 的 task_queue 表模拟了消息队列:
# notification_worker.py
async def run():
while running:
task = await TaskQueueService.dequeue_next_task(db)
if task:
await process(task)
else:
await asyncio.sleep(2) # 没任务就等 2 秒再试
# TaskQueueService
async def dequeue_next_task(db):
return db.execute("""
SELECT * FROM task_queue
WHERE status IN ('pending', 'failed')
AND next_run_at <= NOW()
LIMIT 1
FOR UPDATE SKIP LOCKED -- 防止多个 Worker 抢同一个任务
""")
FOR UPDATE SKIP LOCKED 是关键——它确保即使未来横向扩展多个 Worker,也不会出现重复消费同一个任务的问题。
但这套方案有一个真实的痛点:sleep(2) 的空转浪费。低峰期每 2 秒跑一次空查询,高峰期任务来了也要等最多 2 秒才被发现。
🚀 第五关:用 PostgreSQL LISTEN/NOTIFY 升级唤醒机制
不引入任何新组件,PgSQL 原生就支持发布订阅。
核心思路:NOTIFY 只是唤醒信号,任务真相仍然是 task_queue 表。
第一步:加一个触发器
-- 有新任务入队时,数据库自动发出通知
CREATE OR REPLACE FUNCTION notify_new_task()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('new_task', NEW.id::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER task_queue_notify
AFTER INSERT ON task_queue
FOR EACH ROW EXECUTE FUNCTION notify_new_task();
第二步:Worker 改为监听通知
# 专门开一个长连接监听 new_task 频道
listen_conn = await asyncpg.connect(DSN)
await listen_conn.execute("LISTEN new_task;")
async def on_notify(conn, pid, channel, payload):
# 收到通知,立刻去 drain 队列
for _ in range(MAX_DRAIN): # 加上限防止 listener 被占用过久
task = await dequeue_next_task()
if not task:
break
await process(task)
await listen_conn.add_listener('new_task', on_notify)
第三步:保留一个慢速兜底轮询
# 防止 listener 短暂断线期间漏掉任务
async def fallback_poll():
while running:
await asyncio.sleep(30) # 30 秒一次,纯兜底
await drain_queue()
最终架构
有新任务写入 task_queue
↓
触发器发出 pg_notify('new_task')
↓
Worker 的 LISTEN 连接立刻被唤醒
↓
立刻调用 dequeue_next_task(SKIP LOCKED) drain 队列
↓
(万一 listener 断线)30 秒兜底轮询捞漏网任务
这套方案你能得到什么
| 原方案 | 升级后 | |
|---|---|---|
| 额外依赖 | 无 | 无 |
| 任务响应延迟 | 最多 2 秒 | 毫秒级 |
| 低峰期空转 | 每 2 秒一次空查询 | 完全无空转 |
| 任务可靠性 | ✅ | ✅(NOTIFY 只是信号,真相在表里) |
| 并发安全 | ✅ SKIP LOCKED | ✅ SKIP LOCKED |
| 容灾能力 | 重启丢不了任务 | 重启丢不了任务 + 断线有兜底 |
🐳 整个系统长什么样
# docker-compose(简化版)
services:
api: # FastAPI 主服务
worker: # 审核 Worker(LISTEN + 兜底轮询)
centrifugo: # 实时推送服务
db: # PostgreSQL(task_queue 在这里)
本地一条 docker compose up 全部拉起,和云上行为基本一致,调试非常友好。
完整数据流
[Svelte 前端]
│ 上传文件 WebSocket 收到实时通知 ←─────────────┐
↓ │
[FastAPI 后端] ──写入──→ [task_queue 表] ──pg_notify──→ [Worker 容器]
│ │
└──上传文件──→ [Azure Blob Store] [Azure Content Safety]
│
[Centrifugo] ──推送──→ 前端
💬 最后说几句
我还是个实习生,这套架构不一定是最优解。
但它满足了我对这个阶段最重要的几个要求:能跑、可调试、不引入不必要的依赖、有足够的可靠性。
如果有一天并发量真的上来了,Worker 需要横向扩展到多台机器,那时候再上 Redis 也完全来得及。架构是演进出来的,不是一开始就设计出来的。
下一篇打算写:预签名 URL 是什么?为什么前端绝对不能直接拿 AK/SK 上传文件?
如果你也在做类似的社区产品,欢迎评论区交流。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)