前言

告别「意大利面式」代码!FastAPI+ORM分层架构,新手也能写出大厂级代码

还在把FastAPI和ORM代码全塞在main.py里?改一行代码要翻几百行,参数校验全靠print,数据库操作一不小心就翻车?
我曾踩过所有新手都会掉的坑:接口和数据库逻辑揉成一团,查个bug要从路由追到SQL,新增字段要改遍整个文件…直到用「分层架构」重构后,代码清爽到像开了倍速!
今天就分享一套「拿来即用」的FastAPI+ORM分层模板,不用死记硬背,复制就能跑,不管是课程作业还是练手项目,都能让你的代码从「乱炖」变「精致套餐」~

一、项目结构

ORM/
config/
├── config.py # 数据库连接配置
models/
├── Base.py # 这里的代码,所有模型的基类
└── Book.py #具体的数据库表模型
routers/
├── urls.py #路由接口
schema/
├── jiben.py # Pydantic 数据验证集
main.py #项目入口文件

二、分层结构是什么?

文件夹 负责什么 作用
config 数据库连接、全局配置 不用每次都写一遍连接代码
models ORM 数据表模型 定义用户表、部门表这些数据库结构
schema 请求 / 响应数据校验 检查前端传的参数合不合法
routers 路由和业务逻辑 写接口、实现增删改查
main.py 项目入口 启动 FastAPI,把所有路由拼起来

这样分好之后,找问题、改代码都非常方便,再也不用面对一个几千行的文件了!

三、分层架构完整代码

3.1.config/config.py–数据库配置

  1. 从 SQLAlchemy 异步扩展库中导入 3 个核心工具:
  • create_async_engine:创建异步数据库引擎(负责建立数据库连接)
  • AsyncSession:异步会话类(真正执行 SQL 的操作对象)
  • async_sessionmaker:异步会话工厂(批量生产安全的数据库会话
    格式:数据库类型+驱动://用户名:密码@主机:端口/数据库名?字符集
    # 数据库连接配置 DATABASE_URL = "mysql+aiomysql://root:1130688@localhost:3306/fastapi_orm?charset=utf8mb4"
# 数据库连接配置
DATABASE_URL = "mysql+aiomysql://root:密码@localhost:3306/fastapi_orm?charset=utf8mb4"
database_engine = create_async_engine(
    DATABASE_URL,
    echo=True,        # 打印执行的 SQL 语句(开发调试用)
    pool_size=10,     # 连接池大小(保持 10 个长连接)
    max_overflow=20   # 最大溢出连接(高峰期最多再开 20 个)
)

作用:生产安全的、可直接使用的异步数据库会话

  • engine(引擎) 是 SQLAlchemy 的 “数据库入口”
    负责:建立连接、管理连接池、执行 SQL 底层通信
  • echo=True:开发时能看到执行了什么 SQL,方便调试
  • pool_size 和 max_overflow 是高并发优化配置
  1. async_sessionmaker 是异步会话工厂
    作用:生产安全的、可直接使用的异步数据库会话,每次请求 API 都会从这里拿一个会话
AsyncSession_as = async_sessionmaker(
    bind=database_engine,   # 绑定到上面创建的数据库引擎
    class_=AsyncSession,     # 使用异步会话类
    expire_on_commit=False  # 提交后不销毁对象(非常重要)
)
  1. FastAPI 依赖项
    用于给每个接口提供数据库会话,是最关键的部分
#异步会话获取器
async def get_data():
    async with AsyncSession_as()as A:#创建一个异步数据库会话,命名为 A,并自动管理生命周期
        try:
            yield A#把数据库会话交给路由使用(这就是依赖注入)
            await A.commit()#操作完成后提交事务(保存修改)
        except Exception:
            await A.rollback()#如果操作过程中发生异常,回滚事务(撤销修改)
            raise#把错误抛出去,让 FastAPI 处理
        finally:
            await A.close()#无论是否发生异常,都关闭会话(释放资源)

3.2.models --ORM模型(数据表)

Base.py 是所有数据库模型的父类,Book.py 里的模型会继承它,自动拥有创建时间、修改时间这两个公共字段。

3.2.1.Base.py

from datetime import datetime
from sqlalchemy import DateTime, func
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped
class Base(DeclarativeBase):
    create_time: Mapped[datetime] = mapped_column(
        DateTime,
        insert_default=func.now(),
        default=func.now,
        comment="创建时间"
    )
    update_time: Mapped[datetime] = mapped_column(
        DateTime,
        insert_default=func.now(),
        default=func.now,
        onupdate=func.now(),
        comment="修改时间"
    )

这是你的公共模型基类,所有数据表模型都继承它,就不用每个表都重复写创建时间、修改时间了。
字段参数详解:

  1. create_time: Mapped[datetime]
    类型注解:这个字段是 datetime 类型,对应数据库 DATETIME。
  • insert_default=func.now():插入数据时,自动用数据库的 NOW() 函数生成当前时间,作为创建时间。
  • default=func.now:Python 侧的默认值,ORM 新增对象时,如果不指定时间,会自动赋值。
  • comment=“创建时间”:给数据库列加注释,方便维护。
  1. update_time: Mapped[datetime]
  • 前三个参数和 create_time 一样,保证新增数据时也会写入初始修改时间。
  • onupdate=func.now():更新数据时,自动用数据库的 NOW() 函数更新这个字段,实现 “修改时间自动更新”。

3.2.2.Book.py

from sqlalchemy import Integer, String,Float
from sqlalchemy.orm import mapped_column, Mapped
from models.Base import Base
class Book(Base):
        __tablename__ = "book"#数据库表名:book
        id: Mapped[int] = mapped_column(primary_key=True, comment="书籍id")
        bookname: Mapped[str] = mapped_column(String(255), comment="书名")
        author: Mapped[str] = mapped_column(String(255), comment="作者")
        price: Mapped[float] = mapped_column(Float, comment="价格")
        publisher: Mapped[str] = mapped_column(String(255), comment="出版社")

3.3. routers/–路由与接口逻辑

3.3.1.导入部分

from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func

from config.config import get_data    # 数据库会话依赖
from models.Book import Book        # 图书模型
from schema.jiben import updata_book # Pydantic 验证模型
  • APIRouter:用来拆分路由,让代码更整洁
  • Depends(get_data):自动注入数据库会话,不用手动开关
  • func:用来做聚合查询(计数、平均值等)

3.3.2.查询接口大全

1.查询–获取图书
@orm_fast.get("/find/aa")
async def find_aa(session:AsyncSession=Depends(get_data)):
    result=await session.execute(select(Book))
    return result.scalars().all()

select(Book) = SELECT * FROM book
scalars().all():返回所有数据列表
如果获取单条数据:scalars().first() 或者get(模型类,主键值)

2.条件查询

比较判断:==; >; <; >=; <=等
条件查询:价格大于等于200的Book

@orm_fast.get("/find/bc")
async def find_bc(session:AsyncSession=Depends(get_data)):
    result=await session.execute(select(Book).where(Book.price>=200))
    return result.scalars().all()

多条件:&=AND且;|;~;in_()等。

3.模糊查询–like()

%:零个、一个或多个字符
_:一个单个字符
模糊查询:#作者以‘曹’开头且价格大于100的Book

@orm_fast.get("/find/be")
async def find_be(session:AsyncSession=Depends(get_data)):
    result=await session.execute(select(Book).where(Book.author.like("曹%")& Book.price>100))
    result1=result.scalars().all()
    return result1
4.聚合查询–func.方法()

聚合计算:func.方法(模型类.属性)
count:统计行数量
avg:求平均值
max:求最大值
min:求最小值
sum:求和

@orm_fast.get("/find/bf")
async def find_bf(session:AsyncSession=Depends(get_data)):
    total=await session.execute(select(func.count(Book.id)))
    avg=await session.execute(select(func.avg(Book.price)))
    return {
        "total":total.scalar_one_or_none(),
        "average":avg.scalar_one_or_none()
    }
5.分页查询
@orm_fast.get("/find/bg")
async def find_bg(page:int=1,page_size:int=2,bd:AsyncSession=Depends(get_data)):
    skip=(page-1)*page_size
    stmit=select(Book).offset(skip).limit(page_size)
    result=await bd.execute(stmit)
    return result.scalars().all()

offset():跳过多少条
limit():取多少条

3.3.3.增删改接口

1.添加图书
@orm_fast.post("/find/bh")
async def find_bh(user_book:updata_book,bd:AsyncSession=Depends(get_data)):
    book_obj=Book(**user_book.__dict__)
    bd.add(book_obj)
    await bd.commit()
    return book_obj

user_book.__dict__:把 Pydantic 数据解包给 ORM 模型;自动生成 create_timeupdate_time

2.修改图书
@orm_fast.put("/find/update_bi")
async def find_update_bi(user_book:updata_book,bd:AsyncSession=Depends(get_data)):
    u_books=await bd.get(Book,user_book.id)
    if not u_books:
        return {"msg":"图书不存在"}
    # 赋值更新
    u_books.bookname=user_book.bookname
    u_books.author=user_book.author
    u_books.price=user_book.price
    u_books.publisher=user_book.publisher
    await bd.commit()
    return {"msg":"更新成功"}

重点:update_time 会自动更新!(基类里的 onupdate=func.now ())

3.删除图书
@orm_fast.delete("/find/delete_bi/{id}")
async def find_delete_bi(id:int,bd:AsyncSession=Depends(get_data)):
    u_books=await bd.get(Book,id)
    if not u_books:
        return {"msg":"图书不存在"}
    await bd.delete(u_books)
    await bd.commit()
    return {"msg":"删除成功"}

3.4.schema/jiben.py–数据校验

from pydantic import BaseModel

class updata_book(BaseModel):
    id:int
    bookname:str
    author:str
    price:float
    publisher:str

这是一个数据验证类,专门用来接收前端传过来的图书信息。
它会自动校验前端传的参数类型对不对:

  • id 必须是整数
  • bookname、author、publisher 必须是字符串
  • price 必须是浮点数(小数)
    作用:
  1. 前端传 JSON → 自动转成 Python 对象
  2. 类型不对直接报错(FastAPI 自动返回 422 错误)
  3. 自动生成接口文档(Swagger UI 能直接看到字段)

3.5.main.py–项目入口

from contextlib import asynccontextmanager#导入异步上下文管理器,专门用来写项目启动 / 关闭时要做的事

from fastapi import FastAPI

from config.config import database_engine
from models.Base import Base
from routers.urls import orm_fast


@asynccontextmanager
async def lifespan(app: FastAPI):
    # 项目【启动时】执行一次
    async with database_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    # 项目运行中……(接收请求、跑接口)
    yield

    # 项目【关闭时】执行一次
    await database_engine.dispose()

app=FastAPI(lifespan=lifespan)
app.include_router(orm_fast,prefix="/orm",tags=["orm_APP"])

四、分层架构的小总结

用了分层架构之后,我最大的感受就是:

  1. 代码不乱了,找 bug 快多了
  2. 后续加功能(比如分页、登录),直接在对应层加就行
  3. 写课程作业、练手项目都能用,结构也很规范

这套模板把 ORM、外键、增删改查都包含了,新手入门完全够用!

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐