你知道群聊里的「文件共享、审核」是怎么实现的吗?

一个实习生的 BBS 社区内容审核开发实录:从对象存储、AI 审核、到实时推送,再到消息队列的一次完整演进。

技术栈:Python · FastAPI · PostgreSQL · Azure Blob Store · Azure Content Safety · Centrifugo · Docker · Svelte


🤔 先从一个问题开始

你有没有想过,当你在群聊里发了一张图,对方为什么几乎是瞬间就能看到

这张图去哪了?谁在审核它?它是直接从你手机传到对方手机的吗?

不是的。背后有一整套系统在悄悄工作——而我最近刚好亲手搭了一个类似的东西:一个 BBS 社区的文件上传与内容审核系统。这篇文章就是想聊聊它的完整设计思路,以及我在架构演进中踩过的那些坑。


📦 第一关:图片去哪了?

很多人以为文件是"点对点传输"的,就像蓝牙那样。

实际上不是。

你上传的图片,会先飞到某个对象存储里。你可以把它理解成一个超级大的网盘,每个文件有一个唯一的地址(Key)。我用的是 Azure Blob Store。

用户上传图片
    ↓
FastAPI 后端
    ↓
Azure Blob Store(文件永久存在这里)
    ↓
数据库只记录这个文件的 Key,不存图片本身

为什么数据库不直接存图片?

把图片转成 base64 存进数据库是个经典的新手陷阱——数据库会越来越重,每次查询都在搬运不必要的大体积数据。正确做法是:数据库只存文件的"地址"(Key),读取时通过 Key 生成一个有时效的访问链接(预签名 URL)给前端。

这样数据库保持轻盈,图片的读写压力全交给专门的存储服务。


🚨 第二关:谁在审核这张图?

问题来了:如果有人上传了违规内容怎么办?

总不能让人工一张一张看吧?

这里就需要 AI 内容审核。我用的是 Azure Content Safety,它可以识别图片里是否包含暴力、色情、仇恨言论等内容,返回一个风险评分。

# 调用 Azure Content Safety 审核图片
result = content_safety_client.analyze_image(
    AnalyzeImageOptions(image=ImageData(content=image_bytes))
)
if result.categories_analysis[0].severity > THRESHOLD:
    flag_as_violation(task_id)

但问题又来了——审核需要时间,用户上传完总不能一直盯着转圈圈等吧?


⚡ 第三关:审核结果怎么实时通知用户?

这是整个系统里最有意思的部分。

方案 A(我没选):Azure Functions

文件上传 → 触发 Azure Functions → 审核完发通知。

听起来很美,但实际上有几个问题:

  • 冷启动延迟:函数半天没调用就"睡着了",下次触发要等它醒来
  • 状态难管理:Serverless 是无状态的,审核流程涉及重试逻辑时很难处理
  • 本地调试痛苦:本地模拟环境和云上行为经常对不上

方案 B(我选的):Worker 容器 + Centrifugo

用户上传文件
    ↓
FastAPI 后端把"待审核任务"写入 task_queue 表
    ↓
Worker 容器(持续运行的 Docker 容器)消费任务
    ↓
调用 Azure Content Safety 审核
    ↓
通过 Centrifugo 把结果实时推送给 Svelte 前端
    ↓
用户看到「审核通过 ✅」或「内容违规 ❌」

Centrifugo 是什么? 你可以把它理解成"服务器主动给浏览器发消息的工具"。前端通过 WebSocket 保持长连接,后端一有结果就立刻推过去,用户完全无感知地收到通知,不需要前端反复问"好了吗?好了吗?"


🗄️ 第四关:任务队列怎么做?

我没有引入 Redis 或 RabbitMQ,原因很简单:这是个早期 BBS 社区,我需要轻量化

所以我用 PostgreSQL 的 task_queue 表模拟了消息队列:

# notification_worker.py
async def run():
    while running:
        task = await TaskQueueService.dequeue_next_task(db)
        if task:
            await process(task)
        else:
            await asyncio.sleep(2)  # 没任务就等 2 秒再试
# TaskQueueService
async def dequeue_next_task(db):
    return db.execute("""
        SELECT * FROM task_queue
        WHERE status IN ('pending', 'failed')
          AND next_run_at <= NOW()
        LIMIT 1
        FOR UPDATE SKIP LOCKED  -- 防止多个 Worker 抢同一个任务
    """)

FOR UPDATE SKIP LOCKED 是关键——它确保即使未来横向扩展多个 Worker,也不会出现重复消费同一个任务的问题。

但这套方案有一个真实的痛点:sleep(2) 的空转浪费。低峰期每 2 秒跑一次空查询,高峰期任务来了也要等最多 2 秒才被发现。


🚀 第五关:用 PostgreSQL LISTEN/NOTIFY 升级唤醒机制

不引入任何新组件,PgSQL 原生就支持发布订阅。

核心思路:NOTIFY 只是唤醒信号,任务真相仍然是 task_queue 表。

第一步:加一个触发器

-- 有新任务入队时,数据库自动发出通知
CREATE OR REPLACE FUNCTION notify_new_task()
RETURNS TRIGGER AS $$
BEGIN
  PERFORM pg_notify('new_task', NEW.id::text);
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER task_queue_notify
AFTER INSERT ON task_queue
FOR EACH ROW EXECUTE FUNCTION notify_new_task();

第二步:Worker 改为监听通知

# 专门开一个长连接监听 new_task 频道
listen_conn = await asyncpg.connect(DSN)
await listen_conn.execute("LISTEN new_task;")

async def on_notify(conn, pid, channel, payload):
    # 收到通知,立刻去 drain 队列
    for _ in range(MAX_DRAIN):  # 加上限防止 listener 被占用过久
        task = await dequeue_next_task()
        if not task:
            break
        await process(task)

await listen_conn.add_listener('new_task', on_notify)

第三步:保留一个慢速兜底轮询

# 防止 listener 短暂断线期间漏掉任务
async def fallback_poll():
    while running:
        await asyncio.sleep(30)  # 30 秒一次,纯兜底
        await drain_queue()

最终架构

有新任务写入 task_queue
        ↓
触发器发出 pg_notify('new_task')
        ↓
Worker 的 LISTEN 连接立刻被唤醒
        ↓
立刻调用 dequeue_next_task(SKIP LOCKED) drain 队列
        ↓
(万一 listener 断线)30 秒兜底轮询捞漏网任务

这套方案你能得到什么

原方案 升级后
额外依赖
任务响应延迟 最多 2 秒 毫秒级
低峰期空转 每 2 秒一次空查询 完全无空转
任务可靠性 ✅(NOTIFY 只是信号,真相在表里)
并发安全 ✅ SKIP LOCKED ✅ SKIP LOCKED
容灾能力 重启丢不了任务 重启丢不了任务 + 断线有兜底

🐳 整个系统长什么样

# docker-compose(简化版)
services:
  api:        # FastAPI 主服务
  worker:     # 审核 Worker(LISTEN + 兜底轮询)
  centrifugo: # 实时推送服务
  db:         # PostgreSQL(task_queue 在这里)

本地一条 docker compose up 全部拉起,和云上行为基本一致,调试非常友好。

完整数据流

[Svelte 前端]
    │  上传文件                WebSocket 收到实时通知 ←─────────────┐
    ↓                                                              │
[FastAPI 后端] ──写入──→ [task_queue 表] ──pg_notify──→ [Worker 容器]
    │                                                      │
    └──上传文件──→ [Azure Blob Store]       [Azure Content Safety]
                                                           │
                                                    [Centrifugo] ──推送──→ 前端

💬 最后说几句

我还是个实习生,这套架构不一定是最优解。

但它满足了我对这个阶段最重要的几个要求:能跑、可调试、不引入不必要的依赖、有足够的可靠性

如果有一天并发量真的上来了,Worker 需要横向扩展到多台机器,那时候再上 Redis 也完全来得及。架构是演进出来的,不是一开始就设计出来的。

下一篇打算写:预签名 URL 是什么?为什么前端绝对不能直接拿 AK/SK 上传文件?


如果你也在做类似的社区产品,欢迎评论区交流。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐