前言

        在 Web 开发中,数据库操作是核心环节。原生 SQL 编写繁琐、易出错且存在 SQL 注入风险,而ORM(对象关系映射) 技术可以让我们通过操作 Python 对象的方式与数据库交互,无需直接编写 SQL,大幅提升开发效率。

        FastAPI 中推荐使用SQLAlchemy(异步版)作为 ORM 工具,它功能强大、灵活性高,是企业级 Python 项目的首选,本文将基于SQLAlchemy[asyncio] + MySQL讲解异步 ORM 的实战使用。

1.ORM的核心优势

  • 告别原生 SQL:通过 Python 对象操作数据库,代码更简洁易读
  • 避免 SQL 注入:ORM 自动做参数转义,从根源上防止注入攻击
  • 代码复用性高:减少重复的 SQL 语句,统一数据库操作逻辑
  • 自动管理连接:处理数据库连接、会话和事务,无需手动管理
  • 跨数据库兼容:修改数据库类型时,无需大幅修改业务代码

2.主流ORM工具对比

3.SQLAIchemy异步版实战步骤

步骤 1:安装依赖

需要安装异步版 SQLAlchemy 和 MySQL 的异步驱动aiomysql:

# 安装异步SQLAlchemy
pip install sqlalchemy[asyncio] -i https://mirrors.aliyun.com/pypi/simple/
# 安装MySQL异步驱动
pip install aiomysql -i https://mirrors.aliyun.com/pypi/simple/

步骤 2:创建数据库(提前准备)

确保 MySQL 服务已启动;

连接 MySQL 并创建项目数据库:

按下Win+R,输入cmd回车打开命令提示符

# 假设 MySQL 安装在 C 盘 Program Files 下(根据你的实际路径修改)

cd C:\Program Files\MySQL\MySQL Server 8.0\bin

输入连接命令,回车后输入MySQL 密码(安装时设置的root密码)

输入密码后回车,如下图所示说明连接成功

接下来创建数据库

-- 创建数据库,指定字符集为utf8mb4(支持emoji)
CREATE DATABASE IF NOT EXISTS fastapi_orm DEFAULT CHARACTER SET utf8mb4 DEFAULT COLLATE utf8mb4_general_ci;

打开Pycharm

如下图就是连接成功了

步骤3:初始化FastAPI项目,创建异步数据库引擎

数据库引擎是 SQLAlchemy 与数据库的连接核心,异步版使用create_async_engine创建,同时配置连接池参数(提升数据库访问性能)。

from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine
from contextlib import asynccontextmanager
 
# 数据库配置
DB_URL = "mysql+aiomysql://root:你的密码@localhost:3306/fastapi_orm?charset=utf8mb4"
 
# 全局异步引擎(避免重复创建)
async_engine: AsyncEngine | None = None
# 全局会话工厂(延迟初始化)
AsyncSessionLocal: async_sessionmaker | None = None
 
# 项目生命周期管理器(替代@app.on_event,优化资源释放)
@asynccontextmanager
async def lifespan(app: FastAPI):
    global async_engine
    # 应用启动:初始化异步引擎
    async_engine = create_async_engine(
        DB_URL,
        echo=True,  # 输出SQL日志,开发阶段开启,生产阶段关闭
        pool_size=10,  # 连接池常驻连接数
        max_overflow=20,  # 连接池允许的额外连接数
        pool_pre_ping=True  # 检查连接有效性,避免无效连接
    )
    print("应用启动,数据库引擎初始化完成")
    yield
    # 应用关闭:优雅释放引擎和连接池
    if async_engine:
        await async_engine.dispose()
    print("应用关闭,数据库连接已释放")
 
# 初始化FastAPI,绑定生命周期
app = FastAPI(lifespan=lifespan)

步骤4:定义ORM模型(映射数据库表)

ORM 的核心是模型类与数据库表的一一映射,先定义基础模型类(包含所有表的公共字段,如创建时间、更新时间),再定义业务模型类(如书籍表、用户表)。

需要使用DeclarativeBase作为基类,Mapped约定属性类型,mapped_column映射数据库字段。

from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import DateTime, func, String, Float
from datetime import datetime
 
# 基础模型类:包含公共的创建时间、更新时间
class Base(DeclarativeBase):
    # 创建时间:默认当前时间,仅插入时生效
    create_time: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), comment="创建时间")
    # 更新时间:默认当前时间,更新时自动刷新
    update_time: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), onupdate=func.now(), comment="更新时间")
 
# 书籍模型类:映射book表
class Book(Base):
    __tablename__ = "book"  # 数据库表名
    __table_args__ = {'extend_existing': True}  # 允许重复加载模型,避免报错
 
    # 字段映射:主键id、书名、作者、价格、出版社
    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, comment="书籍ID")
    name: Mapped[str] = mapped_column(String(255), comment="书名")
    author: Mapped[str] = mapped_column(String(255), comment="作者")
    price: Mapped[float] = mapped_column(Float, comment="价格")
    publisher: Mapped[str] = mapped_column(String(255), comment="出版社")
 
# 建表函数:基于模型类创建数据库表
async def create_tables():
    if not async_engine:
        raise RuntimeError("数据库引擎未初始化")
    # 开启事务,创建所有未存在的表
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
 
# 注意:需要在lifespan的应用启动阶段调用建表函数
# 改造lifespan的yield上方代码:
# await create_tables()
# print("数据库表创建完成")

改造后的lifespan:

@asynccontextmanager
async def lifespan(app: FastAPI):
    global async_engine
    async_engine = create_async_engine(DB_URL, echo=True, pool_size=10, max_overflow=20, pool_pre_ping=True)
    await create_tables()  # 调用建表函数
    print("应用启动,数据库引擎初始化完成,表创建成功")
    yield
    if async_engine:
        await async_engine.dispose()
    print("应用关闭,数据库连接已释放")

步骤5:创建数据库会话依赖项,注入到路由

数据库会话(Session)是 ORM 操作数据库的核心对象,通过依赖注入将会话注入到路由中,实现会话的统一管理(创建、使用、关闭、事务回滚)。

from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession
from fastapi import Depends
 
# 创建异步会话工厂,绑定数据库引擎
AsyncSessionLocal = async_sessionmaker(
    bind=async_engine, # 绑定数据库引擎
    class_=AsyncSession, # 指定会话类
    expire_on_commit=False  # 提交后会话对象不过期,避免重复查询
    autoflush=True  # 开启自动刷新,确保数据及时写入数据库
)
 
# 定义数据库会话依赖项:路由中通过Depends调用
async def get_db():
    if not AsyncSessionLocal:
        raise RuntimeError("数据库会话工厂未初始化")
    db = AsyncSessionLocal()
    try:
        yield db  # 将会话传递给路由
        await db.commit()  # 无异常则提交事务
    except Exception as e:
        await db.rollback()  # 有异常则回滚事务
        raise e
    finally:
        await db.close()  # 无论是否异常,最终关闭会话

改造后的lifespan:

# 项目生命周期管理器(替代@app.on_event,优化资源释放)
@asynccontextmanager
async def lifespan(app: FastAPI):
    global async_engine, AsyncSessionLocal
    # 应用启动:初始化异步引擎
    async_engine = create_async_engine(
        DB_URL,
        echo=True,  # 输出SQL日志,开发阶段开启,生产阶段关闭
        pool_size=10,  # 连接池常驻连接数
        max_overflow=20,  # 连接池允许的额外连接数
        pool_pre_ping=True  # 检查连接有效性,避免无效连接
    )
    await create_tables()  # 调用建表函数
    # 引擎初始化完成后,再创建会话工厂
    AsyncSessionLocal = async_sessionmaker(
        bind=async_engine,
        class_=AsyncSession,
        expire_on_commit=False,
        autoflush=True  # 开启自动刷新,确保数据及时写入数据库
    )
    print("应用启动,数据库引擎初始化完成,表创建成功")
    yield
    # 应用关闭:优雅释放引擎和连接池
    if async_engine:
        await async_engine.dispose()
    print("应用关闭,数据库连接已释放")
 
# 初始化FastAPI,绑定生命周期
app = FastAPI(lifespan=lifespan)

步骤6:ORM核心操作(CRUD增删改查)

基于上述配置,我们可以通过操作 Python 对象实现数据库的增删改查,无需编写任何原生 SQL,以下是书籍表的完整 CRUD 实战。

前置:定义 Pydantic 模型(请求体校验)

FastAPI 中推荐使用 Pydantic 做请求参数校验,定义与 Book 模型对应的 Pydantic 类,接收前端传入的参数:

from pydantic import BaseModel
 
# 书籍新增/更新的参数校验模型
class BookBase(BaseModel):
    name: str
    author: str
    price: float
    publisher: str
 
# 支持Pydantic模型解析ORM对象
class BookResponse(BookBase):
    id: int
    create_time: datetime
    update_time: datetime
    # 查询数据库得到 ORM 对象后,直接返回给前端,无需手动转字典
    class Config:
        orm_mode = True # V1 写法(V2 用 from_attributes = True)

1.查询:基础查询、条件查询

查询是数据库操作中最常用的场景,SQLAlchemy 提供了丰富的查询语法,核心是select()函数,配合where()/offset()/limit()实现复杂查询。

from fastapi import APIRouter
from sqlalchemy import select, func
from typing import List
 
# 1. 查询所有书籍(直接使用app定义路由)
@app.get("/book/all", response_model=List[BookResponse])
async def get_all_books(db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(Book))
    return result.scalars().all()
 
# 2. 根据ID查询单本书籍(直接使用app定义路由)
@app.get("/book/{book_id}", response_model=BookResponse | dict)
async def get_book_by_id(book_id: int, db: AsyncSession = Depends(get_db)):
    book = await db.get(Book, book_id)
    if not book:
        return {"code": 404, "message": "书籍不存在"}
    return book
 
# 3. 条件查询:价格小于指定值 + 作者模糊查询(直接使用app定义路由)
@app.get("/book/search/condition", response_model=List[BookResponse])
async def search_book(price: float, author: str, db: AsyncSession = Depends(get_db)):
    # 模糊匹配 like
    # result =  await db.execute(select(Book).where(Book.author.like("曹_")))
    # & 逻辑与 ; , 作用同于& ; | 逻辑或
    # result =  await db.execute(select(Book).where((Book.author.like("曹%"))&(Book.price>100)))
    # result =  await db.execute(select(Book).where((Book.author.like("曹%")),(Book.price>100)))
    # result =  await db.execute(select(Book).where((Book.author.like("曹%"))|(Book.price>100)))
    # 需求:书籍id列表,数据库里面的id在这个列表中 就返回
    # id_list = [1,3,5,7]
    # result = await db.execute(select(Book).where(Book.id.in_(id_list)))
    result = await db.execute(
        select(Book).where(Book.price < price).where(Book.author.like(f"%{author}%"))
    )
    return result.scalars().all()

2.新增:创建对象 -> 添加到会话 -> 提交事务

核心步骤:通过 Pydantic 模型创建 ORM 对象 → 用db.add()添加到会话 → 提交事务(依赖项中自动提交)。

# 新增书籍
# 用户输入图书信息(id、书名、作者、价格、出版社) -> 新增
# 用户输入 -> 参数 -> 请求体
@book_router.post("/add", response_model=BookResponse)
async def add_book(book: BookBase, db: AsyncSession = Depends(get_db)):
    # 获取 book 参数 ,创建图书对象(__dict__ 返回 book 对象的属性字典)
    book_obj = Book(**book.dict())
    # 添加到数据库会话
    db.add(book_obj)
    await db.commit()
    return book_obj

3.更新: 先查询 -> 重新赋值 -> 提交事务

FastAPI 中 ORM 更新的核心是先查后改:先根据主键查询到对象,再对对象的属性重新赋值,提交事务后自动同步到数据库。

from fastapi import HTTPException
 
# 7. 更新书籍信息
# 先查在改
# 设计思路:路径参数 书籍id:作用是查找;请求体参数:作用是新数据(书名、作者、价格、出版社)
@app.put("/book/update/{book_id}", response_model=BookResponse)
async def update_book(book_id: int, book: BookBase, db: AsyncSession = Depends(get_db)):
    db_book = await db.get(Book, book_id)
    # 未找到抛出异常
    if not db_book:
        raise HTTPException(status_code=404, detail="书籍不存在")
    # 更新字段(update_time会自动刷新)
    db_book.name = book.name
    db_book.author = book.author
    db_book.price = book.price
    db_book.publisher = book.publisher
    # 刷新会话,确保字段更新
    await db.flush()
    await db.refresh(db_book)
    return db_book

4.删除:先查询 → 删除对象 → 提交事务

核心步骤:先查询对象 → 用db.delete()删除对象 → 提交事务,完成数据库删除。

# 8. 删除书籍(直接使用app定义路由)
@app.delete("/book/delete/{book_id}")
async def delete_book(book_id: int, db: AsyncSession = Depends(get_db)):
    db_book = await db.get(Book, book_id)
    # 未找到抛出异常
    if not db_book:
        raise HTTPException(status_code=404, detail="书籍不存在")
    await db.delete(db_book)
    await db.commit()
    return {"code": 200, "message": "书籍删除成功"}

4.ORM操作核心总结

完整代码

from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine, async_sessionmaker, AsyncSession
from contextlib import asynccontextmanager
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import DateTime, func, String, Float, select
from datetime import datetime
from pydantic import BaseModel
from typing import List
 
# 数据库配置
DB_URL = "mysql+aiomysql://root:458362@localhost:3306/fastapi_orm?charset=utf8mb4"
 
# 全局异步引擎和会话工厂
async_engine: AsyncEngine | None = None
AsyncSessionLocal: async_sessionmaker | None = None
 
# 基础模型类
class Base(DeclarativeBase):
    create_time: Mapped[datetime] = mapped_column(
        DateTime,
        comment="创建时间",
        default=datetime.now()
    )
    update_time: Mapped[datetime] = mapped_column(
        DateTime,
        comment="更新时间",
        default=datetime.now(),
        onupdate=datetime.now()
    )
 
# 书籍模型类
class Book(Base):
    __tablename__ = "book"
    __table_args__ = {'extend_existing': True}
 
    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, comment="书籍ID")
    name: Mapped[str] = mapped_column(String(255), comment="书名")
    author: Mapped[str] = mapped_column(String(255), comment="作者")
    price: Mapped[float] = mapped_column(Float, comment="价格")
    publisher: Mapped[str] = mapped_column(String(255), comment="出版社")
 
# 建表函数
async def create_tables():
    if not async_engine:
        raise RuntimeError("数据库引擎未初始化")
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
 
# 项目生命周期管理器
@asynccontextmanager
async def lifespan(app: FastAPI):
    global async_engine, AsyncSessionLocal
    async_engine = create_async_engine(
        DB_URL,
        echo=True,
        pool_size=10,
        max_overflow=20,
        pool_pre_ping=True
    )
    await create_tables()
    AsyncSessionLocal = async_sessionmaker(
        bind=async_engine,
        class_=AsyncSession,
        expire_on_commit=False,
        autoflush=True
    )
    print("应用启动,数据库引擎初始化完成,表创建成功")
    yield
    if async_engine:
        await async_engine.dispose()
    print("应用关闭,数据库连接已释放")
 
# 初始化FastAPI
app = FastAPI(lifespan=lifespan)
 
# 数据库会话依赖项
async def get_db():
    if not AsyncSessionLocal:
        raise RuntimeError("数据库会话工厂未初始化")
    db = AsyncSessionLocal()
    try:
        yield db
        await db.commit()
    except Exception as e:
        await db.rollback()
        raise e
    finally:
        await db.close()
 
# Pydantic模型
class BookBase(BaseModel):
    name: str
    author: str
    price: float
    publisher: str
 
class BookResponse(BookBase):
    id: int
    create_time: datetime
    update_time: datetime
 
    class Config:
        from_attributes = True
        orm_mode = True
 
# 根路由
@app.get("/")
async def root():
    return {"message": "FastAPI ORM项目运行成功", "docs_url": "/docs"}
 
# 查询所有书籍
@app.get("/book/all", response_model=List[BookResponse])
async def get_all_books(db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(Book))
    return result.scalars().all()
 
# 根据ID查询单本书籍
@app.get("/book/{book_id}", response_model=BookResponse | dict)
async def get_book_by_id(book_id: int, db: AsyncSession = Depends(get_db)):
    book = await db.get(Book, book_id)
    if not book:
        return {"code": 404, "message": "书籍不存在"}
    return book
 
# 条件查询:价格小于指定值 + 作者模糊查询
@app.get("/book/search/condition", response_model=List[BookResponse])
async def search_book(price: float, author: str, db: AsyncSession = Depends(get_db)):
    result = await db.execute(
        select(Book).where(Book.price < price).where(Book.author.like(f"%{author}%"))
    )
    return result.scalars().all()
 
# 新增书籍
@app.post("/book/add", response_model=BookResponse)
async def add_book(book: BookBase, db: AsyncSession = Depends(get_db)):
    book_obj = Book(**book.__dict__)
    db.add(book_obj)
    await db.commit()
    return book_obj
 
# 更新书籍信息
@app.put("/book/update/{book_id}", response_model=BookResponse)
async def update_book(book_id: int, book: BookBase, db: AsyncSession = Depends(get_db)):
    db_book = await db.get(Book, book_id)
    if not db_book:
        raise HTTPException(status_code=404, detail="书籍不存在")
    db_book.name = book.name
    db_book.author = book.author
    db_book.price = book.price
    db_book.publisher = book.publisher
    await db.flush()
    await db.refresh(db_book)
    return db_book
 
# 删除书籍
@app.delete("/book/delete/{book_id}")
async def delete_book(book_id: int, db: AsyncSession = Depends(get_db)):
    db_book = await db.get(Book, book_id)
    if not db_book:
        raise HTTPException(status_code=404, detail="书籍不存在")
    await db.delete(db_book)
    await db.commit()
    return {"code": 200, "message": "书籍删除成功"}

SQLAlchemy 异步版操作数据库的核心流程可概括为3 步建环境,4 步做操作:

建环境(项目初始化)

  1. 安装依赖:sqlalchemy[asyncio] + 对应数据库的异步驱动;
  2. 创建异步引擎:create_async_engine,配置连接池和日志;
  3. 定义模型类:继承DeclarativeBase,实现表与对象的映射;

数据操作(CRUD)

  • 创建会话工厂:async_sessionmaker,绑定引擎;
  • 定义会话依赖项:get_db(),统一管理会话的创建和释放;
  • 路由中注入会话:db: AsyncSession = Depends(get_db);
  • 操作对象:通过select()/add()/delete() + 对象属性赋值实现 CRUD。
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐