从双模式架构谈技术选型:为什么我的系统后端选了 Flask 而不是 FastAPI

不是 FastAPI 不好,而是 Flask 在这个项目的约束条件下,是阻力最小的路径。
项目地址:AI Academic Polisher


目录


§ 1 背景:这不是一个典型的 API 服务

项目 AI Academic Polisher 是一个学术论文润色系统,核心功能是把 AI 生成的"机器味"文本改写成符合人类学术写作习惯的表达,从而降低 AIGC 检测率。

它有一个不太常见的架构约束:一套代码要同时跑在两种完全不同的环境里

模式 运行环境 基础设施 用户画像
Server Linux + Docker Flask + Gunicorn + MySQL + Redis + RQ Worker 技术人员,多用户共享
Desktop Windows EXE Flask (threaded) + SQLite + MemoryRedis + MemoryQueue 非技术用户,双击即用

这意味着后端框架的选型不能只看"谁性能高"或"谁更现代",而要看谁能在这两种截然不同的运行时环境下都稳定工作,且切换成本最低


§ 2 双模式架构的核心约束

在做技术选型之前,先明确这个项目的几个硬约束:

  1. Desktop 模式必须打包成单个 EXE:用 PyInstaller 打包,用户双击就能跑,不装 Python、不装数据库、不装 Redis
  2. 同步 AI 调用为主:使用 OpenAI 兼容 API(支持官方、代理、Ollama),SDK 的同步版本基于 httpx/requests
  3. 线程池并发:长文档的多段落并发用 ThreadPoolExecutor,不是 asyncio
  4. SSE 实时推送:进度通过 Server-Sent Events 推给前端,需要 HTTP 长连接
  5. Flask 应用上下文:数据库操作依赖 app_context(),跨线程时需要手动注入

这些约束叠加在一起,指向了一个结论:这个项目的并发模型是"同步 + 线程池",不是"异步 + 事件循环"


§ 3 PyInstaller 打包:Flask 的生态优势

Desktop 模式的核心交付物是一个 Windows EXE。PyInstaller 是 Python 桌面打包的事实标准,而 Flask 在这个生态里的兼容性远好于 FastAPI。

Flask 打包的现状

Flask 本身是纯同步框架,依赖链简单:Werkzeug + Jinja2 + click。PyInstaller 对这些包的 hook 支持成熟,社区踩坑经验丰富。我的项目里需要手动声明的 hiddenimports 主要来自 SQLAlchemy 和 OpenAI SDK,Flask 本身几乎不需要额外处理。

FastAPI 打包的潜在问题

FastAPI 的依赖链更长:uvicornuvloop(Linux only)/ asynciohttptoolsstarlette。其中:

  • uvloop 不支持 Windows:FastAPI 在 Windows 上只能用 asyncio 的默认事件循环,性能优势直接消失
  • uvicorn 的多进程模型uvicorn --workers N 在 PyInstaller 打包后行为不可预测,因为 multiprocessing.freeze_support() 的兼容性问题
  • C 扩展依赖httptoolswebsockets 等包含编译的 C 扩展,PyInstaller 打包时需要额外处理 DLL 依赖

Flask 的 app.run(threaded=True) 在 Desktop 模式下就是一个简单的多线程 WSGI 服务器,没有事件循环、没有 C 扩展、没有多进程——打包后的行为和开发时完全一致

# main.py — Desktop 模式启动,简单到不需要解释
if Config.DEPLOY_MODE == 'desktop':
    task_queue.start_worker(app)
    app.run(host='127.0.0.1', port=Config.APP_PORT, threaded=True)

§ 4 同步 OpenAI SDK + ThreadPoolExecutor:够用就是最优解

项目的核心 IO 操作是调用 AI API。每篇论文被切成多个段落,每段独立调用一次 AI,段落之间可以并发。

# 实际的并发模型
executor = ThreadPoolExecutor(max_workers=Config.MAX_AI_WORKERS)

future_to_idx = {
    executor.submit(self._process_single_paragraph, p, i): i
    for i, p in enumerate(paragraphs)
}
for future in as_completed(future_to_idx):
    idx = future_to_idx[future]
    results[idx] = future.result()

为什么不用 asyncio

如果用 FastAPI,自然会倾向于用 AsyncOpenAI + asyncio.gather() 来做并发。但这里有几个现实问题:

  1. 线程池已经够快:5~32 个线程并发调 API,瓶颈在 API 的速率限制(RPM/TPM),不在本地的并发模型。从 15 分钟压缩到 3 分钟,靠的是并发数,不是 asyncio 的事件循环效率
  2. Flask 上下文是线程局部的db.sessioncurrent_app 都绑定在线程上。线程池里每个线程可以通过 with app.app_context() 获得上下文,这是 Flask 的标准用法
  3. asyncio 和 Flask-SQLAlchemy 不兼容:如果用 async,数据库层要换成 sqlalchemy[asyncio] + aiosqlite/aiomysql,整个 ORM 层要重写

换句话说:选 asyncio 不会让 AI 调用更快,但会让数据库层变复杂。这是一个净负收益的选择。

同步 SDK 的简洁性

class AIClient:
    def create_completion(self, messages, temperature, stream=False):
        return self.client.chat.completions.create(
            model=self.model_name,
            messages=messages,
            temperature=temperature,
            stream=stream
        )

同步调用,返回值直接可用,不需要 await,不需要考虑事件循环是否在运行。在 ThreadPoolExecutor 里调用时,每个线程独立阻塞等待 API 响应,互不干扰。


§ 5 Flask-SQLAlchemy 的双数据库切换

项目需要在 Server 模式用 MySQL、Desktop 模式用 SQLite,且上层代码完全无感知。Flask-SQLAlchemy 在这方面的成熟度是选择 Flask 的重要原因。

# config.py — 根据模式选择数据库
if DEPLOY_MODE == 'desktop':
    SQLALCHEMY_DATABASE_URI = f"sqlite:///{db_path}"
    SQLALCHEMY_ENGINE_OPTIONS = {}
else:
    SQLALCHEMY_DATABASE_URI = f"mysql+mysqlconnector://..."
    SQLALCHEMY_ENGINE_OPTIONS = {
        "pool_size": 16, "max_overflow": 24,
        "pool_recycle": 1800, "pool_pre_ping": True
    }

Flask-SQLAlchemy 的 db.init_app(app) 模式天然支持这种"启动时决定后端"的用法。而且它的连接池管理、会话生命周期(请求结束自动 remove())都是开箱即用的。

如果用 FastAPI,对应的方案是 sqlalchemy[asyncio] + 手动管理 AsyncSession。Desktop 模式下 SQLite 的异步驱动是 aiosqlite——一个把同步 SQLite 调用包装到线程里的库。用 async 包装 sync 再跑在 async 框架里,这种套娃增加了复杂度却没有带来实际收益。


§ 6 SSE 推送:Flask 的 Response Generator 天然适配

项目用 SSE 做实时进度推送。Flask 的实现方式极其自然:

@task_bp.route("/<int:task_id>/stream")
def stream_results(task_id):
    def generate():
        pubsub = redis_client.pubsub()
        pubsub.subscribe(RedisKeyManager.stream_channel(task_id))
        while True:
            msg = pubsub.get_message(timeout=1.0)
            if msg and msg["type"] == "message":
                yield f"data: {msg['data']}\n\n"
    return Response(generate(), mimetype="text/event-stream")

这是标准的 WSGI streaming response——generate() 是一个同步生成器,yield 一条数据就推一条给客户端。在 threaded=True 模式下,每个 SSE 连接占一个线程,简单直接。

FastAPI 的 SSE 方案对比

FastAPI 用 StreamingResponse + async generator

# FastAPI 的写法
@app.get("/stream/{task_id}")
async def stream(task_id: int):
    async def generate():
        pubsub = redis_client.pubsub()
        pubsub.subscribe(f"stream:{task_id}")
        while True:
            msg = pubsub.get_message(timeout=1.0)  # 问题:这是同步阻塞调用
            if msg:
                yield f"data: {msg['data']}\n\n"
            await asyncio.sleep(0.1)
    return StreamingResponse(generate(), media_type="text/event-stream")

注意这里的问题:pubsub.get_message()redis-py 的同步方法。在 async 函数里调用同步阻塞方法会阻塞整个事件循环。要正确实现,需要:

  • 换用 aioredis(已合并进 redis-py 5.x 的 async 模块),或者
  • asyncio.to_thread() 包装同步调用

而我的 Desktop 模式用的是自己写的 MemoryRedis,它的 pubsub.get_message() 内部是 queue.Queue.get(timeout=1.0)——一个纯同步的阻塞调用。要让它在 async 环境下正确工作,又要加一层 to_thread 包装。

Flask 的同步模型让 MemoryRedis 的 Pub/Sub 实现零摩擦地接入 SSE 推送,不需要任何 async 适配层。


§ 7 Desktop 模式下的线程模型

Desktop 模式的运行时架构是这样的:

Flask 主线程 (threaded=True)
├── HTTP 请求处理线程 × N
├── SSE 长连接线程 × M
└── MemoryQueue Worker 守护线程 × 1
    └── ThreadPoolExecutor (AI 并发调用) × max_workers

所有组件都在同一个进程内,通过线程间通信协作:

  • MemoryRedis 用 threading.Lock 保护共享数据
  • MemoryQueue 用 queue.Queue 做任务投递
  • Pub/Sub 用 queue.Queue 做消息分发

这个模型和 Flask 的 threaded=True 完美契合。Flask 的每个请求一个线程,Worker 是守护线程,AI 调用在线程池里——整个系统的并发单元都是线程,心智模型统一。

如果换成 FastAPI + uvicorn,运行时模型变成:

uvicorn 事件循环 (单线程)
├── async 请求处理 (协程)
├── async SSE 推送 (协程)
└── ??? Worker 怎么办 ???

Worker 线程里的同步代码(数据库操作、AI 调用)和 uvicorn 的事件循环之间需要桥接。要么用 asyncio.to_thread() 把同步代码扔到线程池,要么用 run_in_executor()——本质上又回到了线程模型,但多了一层 async 包装的复杂度。


§ 8 FastAPI 在这个场景下的"水土不服"

总结一下 FastAPI 在这个项目里会遇到的具体问题:

问题 根因 影响
uvloop 不支持 Windows FastAPI 的性能优势依赖 uvloop Desktop 模式性能退化到和 Flask 持平
PyInstaller 打包复杂 uvicorn + httptools + C 扩展 打包体积增大,兼容性风险高
MemoryRedis 是同步的 鸭子类型替身基于 threading.Lock 在 async 环境下需要 to_thread 包装
Flask-SQLAlchemy 不可用 FastAPI 推荐 async SQLAlchemy 数据库层要重写,双数据库切换变复杂
app_context 机制不同 FastAPI 用 Depends 注入 Worker 线程的上下文管理方式要重新设计
RQ 是同步的 RQ Worker 基于 fork + 同步执行 Server 模式的任务队列选型也要换

这些问题单独看都有解决方案,但叠加在一起就是大量的适配工作,且每一层适配都在把 async 代码桥接回 sync。最终的运行时效果和直接用 Flask 没有本质区别,但代码复杂度显著增加。


§ 9 实战踩坑

以下是项目的 git 提交记录,看看开发过程中实际遇到了哪些和 Flask 线程模型深度绑定的 bug——而这些 bug 恰恰证明了:即便在 Flask 的同步模型下,多线程协作已经够复杂了;如果再叠加一层 async 事件循环,复杂度会指数级上升。

Bug 1:SSE Generator 的 Working outside of application context

提交371bb01修复SSE上下文错误

Flask 的 SSE 推送用的是 Response Generator。问题在于:generate() 函数是一个惰性生成器,它的执行时机不是在路由函数返回时,而是在 Werkzeug 迭代响应体时。此时请求上下文可能已经被弹出。

# 修复前:generate() 内部直接调 db.session,报 application context 错误
@task_bp.route('/<int:task_id>/stream')
def stream_results(task_id):
    def generate():
        task = Task.query.get(task_id)  # 💥 Working outside of application context
        ...

# 修复后:在路由函数内捕获 app 引用,generate() 内手动 push context
def stream_results(task_id):
    app = current_app._get_current_object()
    def generate():
        with app.app_context():
            task = db.session.get(Task, task_id)  # ✅
            ...

和 FastAPI 的关系:FastAPI 的 StreamingResponse + async generator 也有类似问题——依赖注入(Depends)的生命周期在 generator 执行期间是否有效?FastAPI 的文档明确说 依赖的 cleanup 在 response 发送完毕后执行,但如果你在 async generator 里访问数据库 session,需要确保 session 的生命周期覆盖整个流式传输过程。这在 Flask 里用 with app.app_context() 一行解决,在 FastAPI 里需要更精细的依赖生命周期管理。

Bug 2:类型注解引入 import redis 导致 Desktop 打包崩溃

提交d8c3fdd移除 base_processor/progress_publisher/cancellation_checker 对 redis 的硬依赖

类型注解重构(2c6c779)时,给 BaseTaskProcessor 的构造函数加了 redis_client: redis.Redis 类型注解。这导致 Python 在 import 时就要执行 import redis——但 Desktop 模式根本没装 redis-py 这个包。

# 修复前:类型注解引入了硬依赖
import redis as redis_module

class BaseTaskProcessor(ABC):
    def __init__(self, task, redis_client: redis_module.Redis):  # 💥 Desktop 模式 import 失败
        ...

# 修复后:用 Any 替代,保持鸭子类型的灵活性
from typing import Any

class BaseTaskProcessor(ABC):
    def __init__(self, task: object, redis_client: Any):  # ✅ MemoryRedis 和 redis.Redis 都能传入
        ...

和 FastAPI 的关系:FastAPI 重度依赖 Pydantic 和类型注解做请求校验。如果用 FastAPI,你会更倾向于给所有参数加严格类型——但在双模式架构下,类型注解不能比运行时更严格redis_client: Any 看起来"不优雅",但它是鸭子类型替身模式的必要代价。FastAPI 的类型驱动哲学和这种"运行时多态"存在天然张力。

Bug 3:Worker 守护线程的孤儿任务

提交1bae53dWorker启动恢复卡死任务

Desktop 模式下 MemoryQueue 的 worker 是 daemon=True 的守护线程。用户直接关闭窗口时,守护线程立即被杀,正在处理的任务永远停留在 processing 状态。下次启动时这些"孤儿任务"不会被重新消费。

# run_worker.py — 启动时扫描并恢复孤儿任务
def recover_orphan_tasks():
    orphans = Task.query.filter_by(status='processing').all()
    for task in orphans:
        task.status = 'failed'
    db.session.commit()

和 FastAPI 的关系:这个 bug 本身和框架无关,但修复方案依赖 Flask 的 app_context()。Worker 启动时需要访问数据库,Flask 的 with app.app_context() 是标准做法。如果用 FastAPI,没有对应的"应用上下文"概念——你需要手动创建 SQLAlchemy session,或者用 contextmanager 包装,代码路径更长。

Bug 4:SSE 心跳保活与 Nginx 超时

提交813a194SSE心跳保活、断线自动重连

SSE 长连接在 Nginx 反向代理后面,如果一段时间没有数据推送,Nginx 会认为连接空闲并主动断开。修复方案是在 Flask 的 SSE generator 里加心跳:

# 每 30 秒发一个心跳,防止 Nginx 断开空闲连接
if now - last_heartbeat > heartbeat_interval:
    yield f": heartbeat\n\n"
    last_heartbeat = now

和 FastAPI 的关系:这个心跳逻辑在 Flask 的同步 generator 里是一个简单的 time.time() 判断 + yield。如果在 FastAPI 的 async generator 里实现,需要用 asyncio.sleep() 而不是阻塞等待——但 pubsub.get_message(timeout=5) 本身是同步阻塞的。你要么把它包进 to_thread(),要么换用 async redis client。每一个"简单"的同步操作,在 async 环境下都需要额外的桥接代码。

Bug 5:flask_sqlalchemy 打包缺失

提交61a0e55修复 flask_sqlalchemy 打包缺失

PyInstaller 打包时,flask_sqlalchemy 的模板文件和元数据没有被自动收集,导致 EXE 启动时 db.init_app(app) 报错。修复方案是在 .spec 文件里用 collect_all

# AcademicPolisher.spec
from PyInstaller.utils.hooks import collect_all

flask_sqla_datas, flask_sqla_binaries, flask_sqla_hiddenimports = collect_all('flask_sqlalchemy')
flask_cors_datas, flask_cors_binaries, flask_cors_hiddenimports = collect_all('flask_cors')

和 FastAPI 的关系flask_sqlalchemyflask_cors 是 Flask 生态里最成熟的扩展,PyInstaller 社区对它们的 hook 支持相对完善(虽然仍需手动 collect_all)。如果换成 FastAPI 的对应方案(sqlalchemy[asyncio] + starlette CORS middleware),PyInstaller 的 hook 覆盖度更低,踩坑成本更高。

小结:复杂度守恒

这 5 个 bug 有一个共同特点:它们都发生在"线程边界"或"上下文边界"上。Flask 的同步模型已经让这些边界问题足够棘手了。如果再引入 async 事件循环,边界会从"线程 vs 线程"变成"协程 vs 线程 vs 事件循环"——三方博弈,调试难度不是线性增长,而是组合爆炸。

Flask 让我只需要处理一种并发模型(线程),一种上下文机制(app_context),一种阻塞语义(同步阻塞)。这不是"落后",而是在约束条件下的复杂度最小化


§ 10 什么时候我会选 FastAPI

公平地说,FastAPI 在以下场景下是更好的选择:

  1. 纯 API 服务:不需要打包成桌面应用,只跑在 Linux 服务器上
  2. 高并发网关:大量轻量级请求需要高吞吐,uvloop 的优势能发挥出来
  3. 全异步技术栈:数据库用 async driver,HTTP 调用用 httpx.AsyncClient,消息队列用 aio-pika——整条链路都是 async
  4. 自动文档需求强:FastAPI 的 OpenAPI 自动生成确实比 Flask 方便
  5. 类型校验密集:Pydantic 模型对复杂请求体的校验比 Flask 手写 request.json.get() 优雅

但这些优势在我的项目里要么不适用(不是纯 API 服务),要么收益有限(AI 调用是瓶颈,不是框架吞吐量)。


§ 11 小结

技术选型不是选"最好的",而是选"约束条件下阻力最小的"。回顾这个项目的决策链:

  1. 双模式架构 → 需要 PyInstaller 打包 → Flask 的打包生态更成熟
  2. Desktop 模式 → Windows 上没有 uvloop → FastAPI 的性能优势消失
  3. MemoryRedis 替身 → 基于 threading.Lock 的同步实现 → 和 Flask 的同步模型天然兼容
  4. ThreadPoolExecutor 并发 → AI 调用是 IO 密集型 → 线程池够用,不需要 asyncio
  5. Flask-SQLAlchemy → 双数据库一行切换 → 换 async SQLAlchemy 要重写整个 ORM 层
  6. RQ 任务队列 → 同步 Worker 进程 → 和 Flask 的同步生态一脉相承

Flask 不是因为"更好"而被选中,而是因为它和项目的每一个约束条件都不冲突。 FastAPI 在纯 async 场景下确实更强,但当你的基础设施(Redis 替身、任务队列、数据库驱动)全部是同步的,强行引入 async 框架只会增加桥接层的复杂度,而不会带来实际的性能提升。


Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐