从双模式架构谈技术选型:为什么我的系统后端选了 Flask 而不是 FastAPI
从双模式架构谈技术选型:为什么我的系统后端选了 Flask 而不是 FastAPI
不是 FastAPI 不好,而是 Flask 在这个项目的约束条件下,是阻力最小的路径。
项目地址:AI Academic Polisher
目录
- § 1 背景:这不是一个典型的 API 服务
- § 2 双模式架构的核心约束
- § 3 PyInstaller 打包:Flask 的生态优势
- § 4 同步 OpenAI SDK + ThreadPoolExecutor:够用就是最优解
- § 5 Flask-SQLAlchemy 的双数据库切换
- § 6 SSE 推送:Flask 的 Response Generator 天然适配
- § 7 Desktop 模式下的线程模型
- § 8 FastAPI 在这个场景下的"水土不服"
- § 9 实战踩坑
- § 10 什么时候我会选 FastAPI
- § 11 小结
§ 1 背景:这不是一个典型的 API 服务
项目 AI Academic Polisher 是一个学术论文润色系统,核心功能是把 AI 生成的"机器味"文本改写成符合人类学术写作习惯的表达,从而降低 AIGC 检测率。
它有一个不太常见的架构约束:一套代码要同时跑在两种完全不同的环境里。
| 模式 | 运行环境 | 基础设施 | 用户画像 |
|---|---|---|---|
| Server | Linux + Docker | Flask + Gunicorn + MySQL + Redis + RQ Worker | 技术人员,多用户共享 |
| Desktop | Windows EXE | Flask (threaded) + SQLite + MemoryRedis + MemoryQueue | 非技术用户,双击即用 |
这意味着后端框架的选型不能只看"谁性能高"或"谁更现代",而要看谁能在这两种截然不同的运行时环境下都稳定工作,且切换成本最低。
§ 2 双模式架构的核心约束
在做技术选型之前,先明确这个项目的几个硬约束:
- Desktop 模式必须打包成单个 EXE:用 PyInstaller 打包,用户双击就能跑,不装 Python、不装数据库、不装 Redis
- 同步 AI 调用为主:使用 OpenAI 兼容 API(支持官方、代理、Ollama),SDK 的同步版本基于
httpx/requests - 线程池并发:长文档的多段落并发用
ThreadPoolExecutor,不是 asyncio - SSE 实时推送:进度通过 Server-Sent Events 推给前端,需要 HTTP 长连接
- Flask 应用上下文:数据库操作依赖
app_context(),跨线程时需要手动注入
这些约束叠加在一起,指向了一个结论:这个项目的并发模型是"同步 + 线程池",不是"异步 + 事件循环"。
§ 3 PyInstaller 打包:Flask 的生态优势
Desktop 模式的核心交付物是一个 Windows EXE。PyInstaller 是 Python 桌面打包的事实标准,而 Flask 在这个生态里的兼容性远好于 FastAPI。
Flask 打包的现状
Flask 本身是纯同步框架,依赖链简单:Werkzeug + Jinja2 + click。PyInstaller 对这些包的 hook 支持成熟,社区踩坑经验丰富。我的项目里需要手动声明的 hiddenimports 主要来自 SQLAlchemy 和 OpenAI SDK,Flask 本身几乎不需要额外处理。
FastAPI 打包的潜在问题
FastAPI 的依赖链更长:uvicorn → uvloop(Linux only)/ asyncio → httptools → starlette。其中:
- uvloop 不支持 Windows:FastAPI 在 Windows 上只能用
asyncio的默认事件循环,性能优势直接消失 - uvicorn 的多进程模型:
uvicorn --workers N在 PyInstaller 打包后行为不可预测,因为multiprocessing.freeze_support()的兼容性问题 - C 扩展依赖:
httptools、websockets等包含编译的 C 扩展,PyInstaller 打包时需要额外处理 DLL 依赖
Flask 的 app.run(threaded=True) 在 Desktop 模式下就是一个简单的多线程 WSGI 服务器,没有事件循环、没有 C 扩展、没有多进程——打包后的行为和开发时完全一致。
# main.py — Desktop 模式启动,简单到不需要解释
if Config.DEPLOY_MODE == 'desktop':
task_queue.start_worker(app)
app.run(host='127.0.0.1', port=Config.APP_PORT, threaded=True)
§ 4 同步 OpenAI SDK + ThreadPoolExecutor:够用就是最优解
项目的核心 IO 操作是调用 AI API。每篇论文被切成多个段落,每段独立调用一次 AI,段落之间可以并发。
# 实际的并发模型
executor = ThreadPoolExecutor(max_workers=Config.MAX_AI_WORKERS)
future_to_idx = {
executor.submit(self._process_single_paragraph, p, i): i
for i, p in enumerate(paragraphs)
}
for future in as_completed(future_to_idx):
idx = future_to_idx[future]
results[idx] = future.result()
为什么不用 asyncio
如果用 FastAPI,自然会倾向于用 AsyncOpenAI + asyncio.gather() 来做并发。但这里有几个现实问题:
- 线程池已经够快:5~32 个线程并发调 API,瓶颈在 API 的速率限制(RPM/TPM),不在本地的并发模型。从 15 分钟压缩到 3 分钟,靠的是并发数,不是 asyncio 的事件循环效率
- Flask 上下文是线程局部的:
db.session、current_app都绑定在线程上。线程池里每个线程可以通过with app.app_context()获得上下文,这是 Flask 的标准用法 - asyncio 和 Flask-SQLAlchemy 不兼容:如果用 async,数据库层要换成
sqlalchemy[asyncio]+aiosqlite/aiomysql,整个 ORM 层要重写
换句话说:选 asyncio 不会让 AI 调用更快,但会让数据库层变复杂。这是一个净负收益的选择。
同步 SDK 的简洁性
class AIClient:
def create_completion(self, messages, temperature, stream=False):
return self.client.chat.completions.create(
model=self.model_name,
messages=messages,
temperature=temperature,
stream=stream
)
同步调用,返回值直接可用,不需要 await,不需要考虑事件循环是否在运行。在 ThreadPoolExecutor 里调用时,每个线程独立阻塞等待 API 响应,互不干扰。
§ 5 Flask-SQLAlchemy 的双数据库切换
项目需要在 Server 模式用 MySQL、Desktop 模式用 SQLite,且上层代码完全无感知。Flask-SQLAlchemy 在这方面的成熟度是选择 Flask 的重要原因。
# config.py — 根据模式选择数据库
if DEPLOY_MODE == 'desktop':
SQLALCHEMY_DATABASE_URI = f"sqlite:///{db_path}"
SQLALCHEMY_ENGINE_OPTIONS = {}
else:
SQLALCHEMY_DATABASE_URI = f"mysql+mysqlconnector://..."
SQLALCHEMY_ENGINE_OPTIONS = {
"pool_size": 16, "max_overflow": 24,
"pool_recycle": 1800, "pool_pre_ping": True
}
Flask-SQLAlchemy 的 db.init_app(app) 模式天然支持这种"启动时决定后端"的用法。而且它的连接池管理、会话生命周期(请求结束自动 remove())都是开箱即用的。
如果用 FastAPI,对应的方案是 sqlalchemy[asyncio] + 手动管理 AsyncSession。Desktop 模式下 SQLite 的异步驱动是 aiosqlite——一个把同步 SQLite 调用包装到线程里的库。用 async 包装 sync 再跑在 async 框架里,这种套娃增加了复杂度却没有带来实际收益。
§ 6 SSE 推送:Flask 的 Response Generator 天然适配
项目用 SSE 做实时进度推送。Flask 的实现方式极其自然:
@task_bp.route("/<int:task_id>/stream")
def stream_results(task_id):
def generate():
pubsub = redis_client.pubsub()
pubsub.subscribe(RedisKeyManager.stream_channel(task_id))
while True:
msg = pubsub.get_message(timeout=1.0)
if msg and msg["type"] == "message":
yield f"data: {msg['data']}\n\n"
return Response(generate(), mimetype="text/event-stream")
这是标准的 WSGI streaming response——generate() 是一个同步生成器,yield 一条数据就推一条给客户端。在 threaded=True 模式下,每个 SSE 连接占一个线程,简单直接。
FastAPI 的 SSE 方案对比
FastAPI 用 StreamingResponse + async generator:
# FastAPI 的写法
@app.get("/stream/{task_id}")
async def stream(task_id: int):
async def generate():
pubsub = redis_client.pubsub()
pubsub.subscribe(f"stream:{task_id}")
while True:
msg = pubsub.get_message(timeout=1.0) # 问题:这是同步阻塞调用
if msg:
yield f"data: {msg['data']}\n\n"
await asyncio.sleep(0.1)
return StreamingResponse(generate(), media_type="text/event-stream")
注意这里的问题:pubsub.get_message() 是 redis-py 的同步方法。在 async 函数里调用同步阻塞方法会阻塞整个事件循环。要正确实现,需要:
- 换用
aioredis(已合并进redis-py5.x 的 async 模块),或者 - 用
asyncio.to_thread()包装同步调用
而我的 Desktop 模式用的是自己写的 MemoryRedis,它的 pubsub.get_message() 内部是 queue.Queue.get(timeout=1.0)——一个纯同步的阻塞调用。要让它在 async 环境下正确工作,又要加一层 to_thread 包装。
Flask 的同步模型让 MemoryRedis 的 Pub/Sub 实现零摩擦地接入 SSE 推送,不需要任何 async 适配层。
§ 7 Desktop 模式下的线程模型
Desktop 模式的运行时架构是这样的:
Flask 主线程 (threaded=True)
├── HTTP 请求处理线程 × N
├── SSE 长连接线程 × M
└── MemoryQueue Worker 守护线程 × 1
└── ThreadPoolExecutor (AI 并发调用) × max_workers
所有组件都在同一个进程内,通过线程间通信协作:
- MemoryRedis 用
threading.Lock保护共享数据 - MemoryQueue 用
queue.Queue做任务投递 - Pub/Sub 用
queue.Queue做消息分发
这个模型和 Flask 的 threaded=True 完美契合。Flask 的每个请求一个线程,Worker 是守护线程,AI 调用在线程池里——整个系统的并发单元都是线程,心智模型统一。
如果换成 FastAPI + uvicorn,运行时模型变成:
uvicorn 事件循环 (单线程)
├── async 请求处理 (协程)
├── async SSE 推送 (协程)
└── ??? Worker 怎么办 ???
Worker 线程里的同步代码(数据库操作、AI 调用)和 uvicorn 的事件循环之间需要桥接。要么用 asyncio.to_thread() 把同步代码扔到线程池,要么用 run_in_executor()——本质上又回到了线程模型,但多了一层 async 包装的复杂度。
§ 8 FastAPI 在这个场景下的"水土不服"
总结一下 FastAPI 在这个项目里会遇到的具体问题:
| 问题 | 根因 | 影响 |
|---|---|---|
| uvloop 不支持 Windows | FastAPI 的性能优势依赖 uvloop | Desktop 模式性能退化到和 Flask 持平 |
| PyInstaller 打包复杂 | uvicorn + httptools + C 扩展 | 打包体积增大,兼容性风险高 |
| MemoryRedis 是同步的 | 鸭子类型替身基于 threading.Lock | 在 async 环境下需要 to_thread 包装 |
| Flask-SQLAlchemy 不可用 | FastAPI 推荐 async SQLAlchemy | 数据库层要重写,双数据库切换变复杂 |
| app_context 机制不同 | FastAPI 用 Depends 注入 | Worker 线程的上下文管理方式要重新设计 |
| RQ 是同步的 | RQ Worker 基于 fork + 同步执行 | Server 模式的任务队列选型也要换 |
这些问题单独看都有解决方案,但叠加在一起就是大量的适配工作,且每一层适配都在把 async 代码桥接回 sync。最终的运行时效果和直接用 Flask 没有本质区别,但代码复杂度显著增加。
§ 9 实战踩坑
以下是项目的 git 提交记录,看看开发过程中实际遇到了哪些和 Flask 线程模型深度绑定的 bug——而这些 bug 恰恰证明了:即便在 Flask 的同步模型下,多线程协作已经够复杂了;如果再叠加一层 async 事件循环,复杂度会指数级上升。
Bug 1:SSE Generator 的 Working outside of application context
提交:371bb01 — 修复SSE上下文错误
Flask 的 SSE 推送用的是 Response Generator。问题在于:generate() 函数是一个惰性生成器,它的执行时机不是在路由函数返回时,而是在 Werkzeug 迭代响应体时。此时请求上下文可能已经被弹出。
# 修复前:generate() 内部直接调 db.session,报 application context 错误
@task_bp.route('/<int:task_id>/stream')
def stream_results(task_id):
def generate():
task = Task.query.get(task_id) # 💥 Working outside of application context
...
# 修复后:在路由函数内捕获 app 引用,generate() 内手动 push context
def stream_results(task_id):
app = current_app._get_current_object()
def generate():
with app.app_context():
task = db.session.get(Task, task_id) # ✅
...
和 FastAPI 的关系:FastAPI 的 StreamingResponse + async generator 也有类似问题——依赖注入(Depends)的生命周期在 generator 执行期间是否有效?FastAPI 的文档明确说 依赖的 cleanup 在 response 发送完毕后执行,但如果你在 async generator 里访问数据库 session,需要确保 session 的生命周期覆盖整个流式传输过程。这在 Flask 里用 with app.app_context() 一行解决,在 FastAPI 里需要更精细的依赖生命周期管理。
Bug 2:类型注解引入 import redis 导致 Desktop 打包崩溃
提交:d8c3fdd — 移除 base_processor/progress_publisher/cancellation_checker 对 redis 的硬依赖
类型注解重构(2c6c779)时,给 BaseTaskProcessor 的构造函数加了 redis_client: redis.Redis 类型注解。这导致 Python 在 import 时就要执行 import redis——但 Desktop 模式根本没装 redis-py 这个包。
# 修复前:类型注解引入了硬依赖
import redis as redis_module
class BaseTaskProcessor(ABC):
def __init__(self, task, redis_client: redis_module.Redis): # 💥 Desktop 模式 import 失败
...
# 修复后:用 Any 替代,保持鸭子类型的灵活性
from typing import Any
class BaseTaskProcessor(ABC):
def __init__(self, task: object, redis_client: Any): # ✅ MemoryRedis 和 redis.Redis 都能传入
...
和 FastAPI 的关系:FastAPI 重度依赖 Pydantic 和类型注解做请求校验。如果用 FastAPI,你会更倾向于给所有参数加严格类型——但在双模式架构下,类型注解不能比运行时更严格。redis_client: Any 看起来"不优雅",但它是鸭子类型替身模式的必要代价。FastAPI 的类型驱动哲学和这种"运行时多态"存在天然张力。
Bug 3:Worker 守护线程的孤儿任务
提交:1bae53d — Worker启动恢复卡死任务
Desktop 模式下 MemoryQueue 的 worker 是 daemon=True 的守护线程。用户直接关闭窗口时,守护线程立即被杀,正在处理的任务永远停留在 processing 状态。下次启动时这些"孤儿任务"不会被重新消费。
# run_worker.py — 启动时扫描并恢复孤儿任务
def recover_orphan_tasks():
orphans = Task.query.filter_by(status='processing').all()
for task in orphans:
task.status = 'failed'
db.session.commit()
和 FastAPI 的关系:这个 bug 本身和框架无关,但修复方案依赖 Flask 的 app_context()。Worker 启动时需要访问数据库,Flask 的 with app.app_context() 是标准做法。如果用 FastAPI,没有对应的"应用上下文"概念——你需要手动创建 SQLAlchemy session,或者用 contextmanager 包装,代码路径更长。
Bug 4:SSE 心跳保活与 Nginx 超时
提交:813a194 — SSE心跳保活、断线自动重连
SSE 长连接在 Nginx 反向代理后面,如果一段时间没有数据推送,Nginx 会认为连接空闲并主动断开。修复方案是在 Flask 的 SSE generator 里加心跳:
# 每 30 秒发一个心跳,防止 Nginx 断开空闲连接
if now - last_heartbeat > heartbeat_interval:
yield f": heartbeat\n\n"
last_heartbeat = now
和 FastAPI 的关系:这个心跳逻辑在 Flask 的同步 generator 里是一个简单的 time.time() 判断 + yield。如果在 FastAPI 的 async generator 里实现,需要用 asyncio.sleep() 而不是阻塞等待——但 pubsub.get_message(timeout=5) 本身是同步阻塞的。你要么把它包进 to_thread(),要么换用 async redis client。每一个"简单"的同步操作,在 async 环境下都需要额外的桥接代码。
Bug 5:flask_sqlalchemy 打包缺失
提交:61a0e55 — 修复 flask_sqlalchemy 打包缺失
PyInstaller 打包时,flask_sqlalchemy 的模板文件和元数据没有被自动收集,导致 EXE 启动时 db.init_app(app) 报错。修复方案是在 .spec 文件里用 collect_all:
# AcademicPolisher.spec
from PyInstaller.utils.hooks import collect_all
flask_sqla_datas, flask_sqla_binaries, flask_sqla_hiddenimports = collect_all('flask_sqlalchemy')
flask_cors_datas, flask_cors_binaries, flask_cors_hiddenimports = collect_all('flask_cors')
和 FastAPI 的关系:flask_sqlalchemy 和 flask_cors 是 Flask 生态里最成熟的扩展,PyInstaller 社区对它们的 hook 支持相对完善(虽然仍需手动 collect_all)。如果换成 FastAPI 的对应方案(sqlalchemy[asyncio] + starlette CORS middleware),PyInstaller 的 hook 覆盖度更低,踩坑成本更高。
小结:复杂度守恒
这 5 个 bug 有一个共同特点:它们都发生在"线程边界"或"上下文边界"上。Flask 的同步模型已经让这些边界问题足够棘手了。如果再引入 async 事件循环,边界会从"线程 vs 线程"变成"协程 vs 线程 vs 事件循环"——三方博弈,调试难度不是线性增长,而是组合爆炸。
Flask 让我只需要处理一种并发模型(线程),一种上下文机制(app_context),一种阻塞语义(同步阻塞)。这不是"落后",而是在约束条件下的复杂度最小化。
§ 10 什么时候我会选 FastAPI
公平地说,FastAPI 在以下场景下是更好的选择:
- 纯 API 服务:不需要打包成桌面应用,只跑在 Linux 服务器上
- 高并发网关:大量轻量级请求需要高吞吐,uvloop 的优势能发挥出来
- 全异步技术栈:数据库用 async driver,HTTP 调用用
httpx.AsyncClient,消息队列用aio-pika——整条链路都是 async - 自动文档需求强:FastAPI 的 OpenAPI 自动生成确实比 Flask 方便
- 类型校验密集:Pydantic 模型对复杂请求体的校验比 Flask 手写
request.json.get()优雅
但这些优势在我的项目里要么不适用(不是纯 API 服务),要么收益有限(AI 调用是瓶颈,不是框架吞吐量)。
§ 11 小结
技术选型不是选"最好的",而是选"约束条件下阻力最小的"。回顾这个项目的决策链:
- 双模式架构 → 需要 PyInstaller 打包 → Flask 的打包生态更成熟
- Desktop 模式 → Windows 上没有 uvloop → FastAPI 的性能优势消失
- MemoryRedis 替身 → 基于 threading.Lock 的同步实现 → 和 Flask 的同步模型天然兼容
- ThreadPoolExecutor 并发 → AI 调用是 IO 密集型 → 线程池够用,不需要 asyncio
- Flask-SQLAlchemy → 双数据库一行切换 → 换 async SQLAlchemy 要重写整个 ORM 层
- RQ 任务队列 → 同步 Worker 进程 → 和 Flask 的同步生态一脉相承
Flask 不是因为"更好"而被选中,而是因为它和项目的每一个约束条件都不冲突。 FastAPI 在纯 async 场景下确实更强,但当你的基础设施(Redis 替身、任务队列、数据库驱动)全部是同步的,强行引入 async 框架只会增加桥接层的复杂度,而不会带来实际的性能提升。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)