FastAPI+ORM分层架构入门教程
前言
告别「意大利面式」代码!FastAPI+ORM分层架构,新手也能写出大厂级代码
还在把FastAPI和ORM代码全塞在main.py里?改一行代码要翻几百行,参数校验全靠print,数据库操作一不小心就翻车?
我曾踩过所有新手都会掉的坑:接口和数据库逻辑揉成一团,查个bug要从路由追到SQL,新增字段要改遍整个文件…直到用「分层架构」重构后,代码清爽到像开了倍速!
今天就分享一套「拿来即用」的FastAPI+ORM分层模板,不用死记硬背,复制就能跑,不管是课程作业还是练手项目,都能让你的代码从「乱炖」变「精致套餐」~
一、项目结构
ORM/
config/
├── config.py # 数据库连接配置
models/
├── Base.py # 这里的代码,所有模型的基类
└── Book.py #具体的数据库表模型
routers/
├── urls.py #路由接口
schema/
├── jiben.py # Pydantic 数据验证集
main.py #项目入口文件
二、分层结构是什么?
| 文件夹 | 负责什么 | 作用 |
|---|---|---|
| config | 数据库连接、全局配置 | 不用每次都写一遍连接代码 |
| models | ORM 数据表模型 | 定义用户表、部门表这些数据库结构 |
| schema | 请求 / 响应数据校验 | 检查前端传的参数合不合法 |
| routers | 路由和业务逻辑 | 写接口、实现增删改查 |
| main.py | 项目入口 | 启动 FastAPI,把所有路由拼起来 |
这样分好之后,找问题、改代码都非常方便,再也不用面对一个几千行的文件了!
三、分层架构完整代码
3.1.config/config.py–数据库配置
- 从 SQLAlchemy 异步扩展库中导入 3 个核心工具:
- create_async_engine:创建异步数据库引擎(负责建立数据库连接)
- AsyncSession:异步会话类(真正执行 SQL 的操作对象)
- async_sessionmaker:异步会话工厂(批量生产安全的数据库会话
格式:数据库类型+驱动://用户名:密码@主机:端口/数据库名?字符集# 数据库连接配置 DATABASE_URL = "mysql+aiomysql://root:1130688@localhost:3306/fastapi_orm?charset=utf8mb4"
# 数据库连接配置
DATABASE_URL = "mysql+aiomysql://root:密码@localhost:3306/fastapi_orm?charset=utf8mb4"
database_engine = create_async_engine(
DATABASE_URL,
echo=True, # 打印执行的 SQL 语句(开发调试用)
pool_size=10, # 连接池大小(保持 10 个长连接)
max_overflow=20 # 最大溢出连接(高峰期最多再开 20 个)
)
作用:生产安全的、可直接使用的异步数据库会话
- engine(引擎) 是 SQLAlchemy 的 “数据库入口”
负责:建立连接、管理连接池、执行 SQL 底层通信 - echo=True:开发时能看到执行了什么 SQL,方便调试
- pool_size 和 max_overflow 是高并发优化配置
- async_sessionmaker 是异步会话工厂
作用:生产安全的、可直接使用的异步数据库会话,每次请求 API 都会从这里拿一个会话
AsyncSession_as = async_sessionmaker(
bind=database_engine, # 绑定到上面创建的数据库引擎
class_=AsyncSession, # 使用异步会话类
expire_on_commit=False # 提交后不销毁对象(非常重要)
)
- FastAPI 依赖项
用于给每个接口提供数据库会话,是最关键的部分
#异步会话获取器
async def get_data():
async with AsyncSession_as()as A:#创建一个异步数据库会话,命名为 A,并自动管理生命周期
try:
yield A#把数据库会话交给路由使用(这就是依赖注入)
await A.commit()#操作完成后提交事务(保存修改)
except Exception:
await A.rollback()#如果操作过程中发生异常,回滚事务(撤销修改)
raise#把错误抛出去,让 FastAPI 处理
finally:
await A.close()#无论是否发生异常,都关闭会话(释放资源)
3.2.models --ORM模型(数据表)
Base.py 是所有数据库模型的父类,Book.py 里的模型会继承它,自动拥有创建时间、修改时间这两个公共字段。
3.2.1.Base.py
from datetime import datetime
from sqlalchemy import DateTime, func
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped
class Base(DeclarativeBase):
create_time: Mapped[datetime] = mapped_column(
DateTime,
insert_default=func.now(),
default=func.now,
comment="创建时间"
)
update_time: Mapped[datetime] = mapped_column(
DateTime,
insert_default=func.now(),
default=func.now,
onupdate=func.now(),
comment="修改时间"
)
这是你的公共模型基类,所有数据表模型都继承它,就不用每个表都重复写创建时间、修改时间了。
字段参数详解:
- create_time: Mapped[datetime]
类型注解:这个字段是 datetime 类型,对应数据库 DATETIME。
- insert_default=func.now():插入数据时,自动用数据库的 NOW() 函数生成当前时间,作为创建时间。
- default=func.now:Python 侧的默认值,ORM 新增对象时,如果不指定时间,会自动赋值。
- comment=“创建时间”:给数据库列加注释,方便维护。
- update_time: Mapped[datetime]
- 前三个参数和 create_time 一样,保证新增数据时也会写入初始修改时间。
- onupdate=func.now():更新数据时,自动用数据库的 NOW() 函数更新这个字段,实现 “修改时间自动更新”。
3.2.2.Book.py
from sqlalchemy import Integer, String,Float
from sqlalchemy.orm import mapped_column, Mapped
from models.Base import Base
class Book(Base):
__tablename__ = "book"#数据库表名:book
id: Mapped[int] = mapped_column(primary_key=True, comment="书籍id")
bookname: Mapped[str] = mapped_column(String(255), comment="书名")
author: Mapped[str] = mapped_column(String(255), comment="作者")
price: Mapped[float] = mapped_column(Float, comment="价格")
publisher: Mapped[str] = mapped_column(String(255), comment="出版社")
3.3. routers/–路由与接口逻辑
3.3.1.导入部分
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from config.config import get_data # 数据库会话依赖
from models.Book import Book # 图书模型
from schema.jiben import updata_book # Pydantic 验证模型
- APIRouter:用来拆分路由,让代码更整洁
- Depends(get_data):自动注入数据库会话,不用手动开关
- func:用来做聚合查询(计数、平均值等)
3.3.2.查询接口大全
1.查询–获取图书
@orm_fast.get("/find/aa")
async def find_aa(session:AsyncSession=Depends(get_data)):
result=await session.execute(select(Book))
return result.scalars().all()
select(Book) = SELECT * FROM book
scalars().all():返回所有数据列表
如果获取单条数据:scalars().first() 或者get(模型类,主键值)
2.条件查询
比较判断:==; >; <; >=; <=等
条件查询:价格大于等于200的Book
@orm_fast.get("/find/bc")
async def find_bc(session:AsyncSession=Depends(get_data)):
result=await session.execute(select(Book).where(Book.price>=200))
return result.scalars().all()
多条件:&=AND且;|;~;in_()等。
3.模糊查询–like()
%:零个、一个或多个字符
_:一个单个字符
模糊查询:#作者以‘曹’开头且价格大于100的Book
@orm_fast.get("/find/be")
async def find_be(session:AsyncSession=Depends(get_data)):
result=await session.execute(select(Book).where(Book.author.like("曹%")& Book.price>100))
result1=result.scalars().all()
return result1
4.聚合查询–func.方法()
聚合计算:func.方法(模型类.属性)
count:统计行数量
avg:求平均值
max:求最大值
min:求最小值
sum:求和
@orm_fast.get("/find/bf")
async def find_bf(session:AsyncSession=Depends(get_data)):
total=await session.execute(select(func.count(Book.id)))
avg=await session.execute(select(func.avg(Book.price)))
return {
"total":total.scalar_one_or_none(),
"average":avg.scalar_one_or_none()
}
5.分页查询
@orm_fast.get("/find/bg")
async def find_bg(page:int=1,page_size:int=2,bd:AsyncSession=Depends(get_data)):
skip=(page-1)*page_size
stmit=select(Book).offset(skip).limit(page_size)
result=await bd.execute(stmit)
return result.scalars().all()
offset():跳过多少条
limit():取多少条
3.3.3.增删改接口
1.添加图书
@orm_fast.post("/find/bh")
async def find_bh(user_book:updata_book,bd:AsyncSession=Depends(get_data)):
book_obj=Book(**user_book.__dict__)
bd.add(book_obj)
await bd.commit()
return book_obj
user_book.__dict__:把 Pydantic 数据解包给 ORM 模型;自动生成 create_time 和 update_time
2.修改图书
@orm_fast.put("/find/update_bi")
async def find_update_bi(user_book:updata_book,bd:AsyncSession=Depends(get_data)):
u_books=await bd.get(Book,user_book.id)
if not u_books:
return {"msg":"图书不存在"}
# 赋值更新
u_books.bookname=user_book.bookname
u_books.author=user_book.author
u_books.price=user_book.price
u_books.publisher=user_book.publisher
await bd.commit()
return {"msg":"更新成功"}
重点:update_time 会自动更新!(基类里的 onupdate=func.now ())
3.删除图书
@orm_fast.delete("/find/delete_bi/{id}")
async def find_delete_bi(id:int,bd:AsyncSession=Depends(get_data)):
u_books=await bd.get(Book,id)
if not u_books:
return {"msg":"图书不存在"}
await bd.delete(u_books)
await bd.commit()
return {"msg":"删除成功"}
3.4.schema/jiben.py–数据校验
from pydantic import BaseModel
class updata_book(BaseModel):
id:int
bookname:str
author:str
price:float
publisher:str
这是一个数据验证类,专门用来接收前端传过来的图书信息。
它会自动校验前端传的参数类型对不对:
- id 必须是整数
- bookname、author、publisher 必须是字符串
- price 必须是浮点数(小数)
作用:
- 前端传 JSON → 自动转成 Python 对象
- 类型不对直接报错(FastAPI 自动返回 422 错误)
- 自动生成接口文档(Swagger UI 能直接看到字段)
3.5.main.py–项目入口
from contextlib import asynccontextmanager#导入异步上下文管理器,专门用来写项目启动 / 关闭时要做的事
from fastapi import FastAPI
from config.config import database_engine
from models.Base import Base
from routers.urls import orm_fast
@asynccontextmanager
async def lifespan(app: FastAPI):
# 项目【启动时】执行一次
async with database_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# 项目运行中……(接收请求、跑接口)
yield
# 项目【关闭时】执行一次
await database_engine.dispose()
app=FastAPI(lifespan=lifespan)
app.include_router(orm_fast,prefix="/orm",tags=["orm_APP"])
四、分层架构的小总结
用了分层架构之后,我最大的感受就是:
- 代码不乱了,找 bug 快多了
- 后续加功能(比如分页、登录),直接在对应层加就行
- 写课程作业、练手项目都能用,结构也很规范
这套模板把 ORM、外键、增删改查都包含了,新手入门完全够用!
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)