Python零基础学习Day3——ORM
前言
在 Web 开发中,数据库操作是核心环节。原生 SQL 编写繁琐、易出错且存在 SQL 注入风险,而ORM(对象关系映射) 技术可以让我们通过操作 Python 对象的方式与数据库交互,无需直接编写 SQL,大幅提升开发效率。
FastAPI 中推荐使用SQLAlchemy(异步版)作为 ORM 工具,它功能强大、灵活性高,是企业级 Python 项目的首选,本文将基于SQLAlchemy[asyncio] + MySQL讲解异步 ORM 的实战使用。
1.ORM的核心优势
- 告别原生 SQL:通过 Python 对象操作数据库,代码更简洁易读
- 避免 SQL 注入:ORM 自动做参数转义,从根源上防止注入攻击
- 代码复用性高:减少重复的 SQL 语句,统一数据库操作逻辑
- 自动管理连接:处理数据库连接、会话和事务,无需手动管理
- 跨数据库兼容:修改数据库类型时,无需大幅修改业务代码
2.主流ORM工具对比

3.SQLAIchemy异步版实战步骤
步骤 1:安装依赖
需要安装异步版 SQLAlchemy 和 MySQL 的异步驱动aiomysql:
# 安装异步SQLAlchemy
pip install sqlalchemy[asyncio] -i https://mirrors.aliyun.com/pypi/simple/
# 安装MySQL异步驱动
pip install aiomysql -i https://mirrors.aliyun.com/pypi/simple/
步骤 2:创建数据库(提前准备)
确保 MySQL 服务已启动;

连接 MySQL 并创建项目数据库:
按下Win+R,输入cmd回车打开命令提示符

# 假设 MySQL 安装在 C 盘 Program Files 下(根据你的实际路径修改)
cd C:\Program Files\MySQL\MySQL Server 8.0\bin

输入连接命令,回车后输入MySQL 密码(安装时设置的root密码)

输入密码后回车,如下图所示说明连接成功

接下来创建数据库
-- 创建数据库,指定字符集为utf8mb4(支持emoji)
CREATE DATABASE IF NOT EXISTS fastapi_orm DEFAULT CHARACTER SET utf8mb4 DEFAULT COLLATE utf8mb4_general_ci;


打开Pycharm




如下图就是连接成功了

步骤3:初始化FastAPI项目,创建异步数据库引擎
数据库引擎是 SQLAlchemy 与数据库的连接核心,异步版使用create_async_engine创建,同时配置连接池参数(提升数据库访问性能)。
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine
from contextlib import asynccontextmanager
# 数据库配置
DB_URL = "mysql+aiomysql://root:你的密码@localhost:3306/fastapi_orm?charset=utf8mb4"
# 全局异步引擎(避免重复创建)
async_engine: AsyncEngine | None = None
# 全局会话工厂(延迟初始化)
AsyncSessionLocal: async_sessionmaker | None = None
# 项目生命周期管理器(替代@app.on_event,优化资源释放)
@asynccontextmanager
async def lifespan(app: FastAPI):
global async_engine
# 应用启动:初始化异步引擎
async_engine = create_async_engine(
DB_URL,
echo=True, # 输出SQL日志,开发阶段开启,生产阶段关闭
pool_size=10, # 连接池常驻连接数
max_overflow=20, # 连接池允许的额外连接数
pool_pre_ping=True # 检查连接有效性,避免无效连接
)
print("应用启动,数据库引擎初始化完成")
yield
# 应用关闭:优雅释放引擎和连接池
if async_engine:
await async_engine.dispose()
print("应用关闭,数据库连接已释放")
# 初始化FastAPI,绑定生命周期
app = FastAPI(lifespan=lifespan)
步骤4:定义ORM模型(映射数据库表)
ORM 的核心是模型类与数据库表的一一映射,先定义基础模型类(包含所有表的公共字段,如创建时间、更新时间),再定义业务模型类(如书籍表、用户表)。
需要使用DeclarativeBase作为基类,Mapped约定属性类型,mapped_column映射数据库字段。
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import DateTime, func, String, Float
from datetime import datetime
# 基础模型类:包含公共的创建时间、更新时间
class Base(DeclarativeBase):
# 创建时间:默认当前时间,仅插入时生效
create_time: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), comment="创建时间")
# 更新时间:默认当前时间,更新时自动刷新
update_time: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), onupdate=func.now(), comment="更新时间")
# 书籍模型类:映射book表
class Book(Base):
__tablename__ = "book" # 数据库表名
__table_args__ = {'extend_existing': True} # 允许重复加载模型,避免报错
# 字段映射:主键id、书名、作者、价格、出版社
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, comment="书籍ID")
name: Mapped[str] = mapped_column(String(255), comment="书名")
author: Mapped[str] = mapped_column(String(255), comment="作者")
price: Mapped[float] = mapped_column(Float, comment="价格")
publisher: Mapped[str] = mapped_column(String(255), comment="出版社")
# 建表函数:基于模型类创建数据库表
async def create_tables():
if not async_engine:
raise RuntimeError("数据库引擎未初始化")
# 开启事务,创建所有未存在的表
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# 注意:需要在lifespan的应用启动阶段调用建表函数
# 改造lifespan的yield上方代码:
# await create_tables()
# print("数据库表创建完成")
改造后的lifespan:
@asynccontextmanager
async def lifespan(app: FastAPI):
global async_engine
async_engine = create_async_engine(DB_URL, echo=True, pool_size=10, max_overflow=20, pool_pre_ping=True)
await create_tables() # 调用建表函数
print("应用启动,数据库引擎初始化完成,表创建成功")
yield
if async_engine:
await async_engine.dispose()
print("应用关闭,数据库连接已释放")
步骤5:创建数据库会话依赖项,注入到路由
数据库会话(Session)是 ORM 操作数据库的核心对象,通过依赖注入将会话注入到路由中,实现会话的统一管理(创建、使用、关闭、事务回滚)。
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession
from fastapi import Depends
# 创建异步会话工厂,绑定数据库引擎
AsyncSessionLocal = async_sessionmaker(
bind=async_engine, # 绑定数据库引擎
class_=AsyncSession, # 指定会话类
expire_on_commit=False # 提交后会话对象不过期,避免重复查询
autoflush=True # 开启自动刷新,确保数据及时写入数据库
)
# 定义数据库会话依赖项:路由中通过Depends调用
async def get_db():
if not AsyncSessionLocal:
raise RuntimeError("数据库会话工厂未初始化")
db = AsyncSessionLocal()
try:
yield db # 将会话传递给路由
await db.commit() # 无异常则提交事务
except Exception as e:
await db.rollback() # 有异常则回滚事务
raise e
finally:
await db.close() # 无论是否异常,最终关闭会话
改造后的lifespan:
# 项目生命周期管理器(替代@app.on_event,优化资源释放)
@asynccontextmanager
async def lifespan(app: FastAPI):
global async_engine, AsyncSessionLocal
# 应用启动:初始化异步引擎
async_engine = create_async_engine(
DB_URL,
echo=True, # 输出SQL日志,开发阶段开启,生产阶段关闭
pool_size=10, # 连接池常驻连接数
max_overflow=20, # 连接池允许的额外连接数
pool_pre_ping=True # 检查连接有效性,避免无效连接
)
await create_tables() # 调用建表函数
# 引擎初始化完成后,再创建会话工厂
AsyncSessionLocal = async_sessionmaker(
bind=async_engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=True # 开启自动刷新,确保数据及时写入数据库
)
print("应用启动,数据库引擎初始化完成,表创建成功")
yield
# 应用关闭:优雅释放引擎和连接池
if async_engine:
await async_engine.dispose()
print("应用关闭,数据库连接已释放")
# 初始化FastAPI,绑定生命周期
app = FastAPI(lifespan=lifespan)
步骤6:ORM核心操作(CRUD增删改查)
基于上述配置,我们可以通过操作 Python 对象实现数据库的增删改查,无需编写任何原生 SQL,以下是书籍表的完整 CRUD 实战。
前置:定义 Pydantic 模型(请求体校验)
FastAPI 中推荐使用 Pydantic 做请求参数校验,定义与 Book 模型对应的 Pydantic 类,接收前端传入的参数:
from pydantic import BaseModel
# 书籍新增/更新的参数校验模型
class BookBase(BaseModel):
name: str
author: str
price: float
publisher: str
# 支持Pydantic模型解析ORM对象
class BookResponse(BookBase):
id: int
create_time: datetime
update_time: datetime
# 查询数据库得到 ORM 对象后,直接返回给前端,无需手动转字典
class Config:
orm_mode = True # V1 写法(V2 用 from_attributes = True)
1.查询:基础查询、条件查询
查询是数据库操作中最常用的场景,SQLAlchemy 提供了丰富的查询语法,核心是select()函数,配合where()/offset()/limit()实现复杂查询。
from fastapi import APIRouter
from sqlalchemy import select, func
from typing import List
# 1. 查询所有书籍(直接使用app定义路由)
@app.get("/book/all", response_model=List[BookResponse])
async def get_all_books(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Book))
return result.scalars().all()
# 2. 根据ID查询单本书籍(直接使用app定义路由)
@app.get("/book/{book_id}", response_model=BookResponse | dict)
async def get_book_by_id(book_id: int, db: AsyncSession = Depends(get_db)):
book = await db.get(Book, book_id)
if not book:
return {"code": 404, "message": "书籍不存在"}
return book
# 3. 条件查询:价格小于指定值 + 作者模糊查询(直接使用app定义路由)
@app.get("/book/search/condition", response_model=List[BookResponse])
async def search_book(price: float, author: str, db: AsyncSession = Depends(get_db)):
# 模糊匹配 like
# result = await db.execute(select(Book).where(Book.author.like("曹_")))
# & 逻辑与 ; , 作用同于& ; | 逻辑或
# result = await db.execute(select(Book).where((Book.author.like("曹%"))&(Book.price>100)))
# result = await db.execute(select(Book).where((Book.author.like("曹%")),(Book.price>100)))
# result = await db.execute(select(Book).where((Book.author.like("曹%"))|(Book.price>100)))
# 需求:书籍id列表,数据库里面的id在这个列表中 就返回
# id_list = [1,3,5,7]
# result = await db.execute(select(Book).where(Book.id.in_(id_list)))
result = await db.execute(
select(Book).where(Book.price < price).where(Book.author.like(f"%{author}%"))
)
return result.scalars().all()
2.新增:创建对象 -> 添加到会话 -> 提交事务
核心步骤:通过 Pydantic 模型创建 ORM 对象 → 用db.add()添加到会话 → 提交事务(依赖项中自动提交)。
# 新增书籍
# 用户输入图书信息(id、书名、作者、价格、出版社) -> 新增
# 用户输入 -> 参数 -> 请求体
@book_router.post("/add", response_model=BookResponse)
async def add_book(book: BookBase, db: AsyncSession = Depends(get_db)):
# 获取 book 参数 ,创建图书对象(__dict__ 返回 book 对象的属性字典)
book_obj = Book(**book.dict())
# 添加到数据库会话
db.add(book_obj)
await db.commit()
return book_obj
3.更新: 先查询 -> 重新赋值 -> 提交事务
FastAPI 中 ORM 更新的核心是先查后改:先根据主键查询到对象,再对对象的属性重新赋值,提交事务后自动同步到数据库。
from fastapi import HTTPException
# 7. 更新书籍信息
# 先查在改
# 设计思路:路径参数 书籍id:作用是查找;请求体参数:作用是新数据(书名、作者、价格、出版社)
@app.put("/book/update/{book_id}", response_model=BookResponse)
async def update_book(book_id: int, book: BookBase, db: AsyncSession = Depends(get_db)):
db_book = await db.get(Book, book_id)
# 未找到抛出异常
if not db_book:
raise HTTPException(status_code=404, detail="书籍不存在")
# 更新字段(update_time会自动刷新)
db_book.name = book.name
db_book.author = book.author
db_book.price = book.price
db_book.publisher = book.publisher
# 刷新会话,确保字段更新
await db.flush()
await db.refresh(db_book)
return db_book
4.删除:先查询 → 删除对象 → 提交事务
核心步骤:先查询对象 → 用db.delete()删除对象 → 提交事务,完成数据库删除。
# 8. 删除书籍(直接使用app定义路由)
@app.delete("/book/delete/{book_id}")
async def delete_book(book_id: int, db: AsyncSession = Depends(get_db)):
db_book = await db.get(Book, book_id)
# 未找到抛出异常
if not db_book:
raise HTTPException(status_code=404, detail="书籍不存在")
await db.delete(db_book)
await db.commit()
return {"code": 200, "message": "书籍删除成功"}
4.ORM操作核心总结
完整代码
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine, async_sessionmaker, AsyncSession
from contextlib import asynccontextmanager
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import DateTime, func, String, Float, select
from datetime import datetime
from pydantic import BaseModel
from typing import List
# 数据库配置
DB_URL = "mysql+aiomysql://root:458362@localhost:3306/fastapi_orm?charset=utf8mb4"
# 全局异步引擎和会话工厂
async_engine: AsyncEngine | None = None
AsyncSessionLocal: async_sessionmaker | None = None
# 基础模型类
class Base(DeclarativeBase):
create_time: Mapped[datetime] = mapped_column(
DateTime,
comment="创建时间",
default=datetime.now()
)
update_time: Mapped[datetime] = mapped_column(
DateTime,
comment="更新时间",
default=datetime.now(),
onupdate=datetime.now()
)
# 书籍模型类
class Book(Base):
__tablename__ = "book"
__table_args__ = {'extend_existing': True}
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, comment="书籍ID")
name: Mapped[str] = mapped_column(String(255), comment="书名")
author: Mapped[str] = mapped_column(String(255), comment="作者")
price: Mapped[float] = mapped_column(Float, comment="价格")
publisher: Mapped[str] = mapped_column(String(255), comment="出版社")
# 建表函数
async def create_tables():
if not async_engine:
raise RuntimeError("数据库引擎未初始化")
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# 项目生命周期管理器
@asynccontextmanager
async def lifespan(app: FastAPI):
global async_engine, AsyncSessionLocal
async_engine = create_async_engine(
DB_URL,
echo=True,
pool_size=10,
max_overflow=20,
pool_pre_ping=True
)
await create_tables()
AsyncSessionLocal = async_sessionmaker(
bind=async_engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=True
)
print("应用启动,数据库引擎初始化完成,表创建成功")
yield
if async_engine:
await async_engine.dispose()
print("应用关闭,数据库连接已释放")
# 初始化FastAPI
app = FastAPI(lifespan=lifespan)
# 数据库会话依赖项
async def get_db():
if not AsyncSessionLocal:
raise RuntimeError("数据库会话工厂未初始化")
db = AsyncSessionLocal()
try:
yield db
await db.commit()
except Exception as e:
await db.rollback()
raise e
finally:
await db.close()
# Pydantic模型
class BookBase(BaseModel):
name: str
author: str
price: float
publisher: str
class BookResponse(BookBase):
id: int
create_time: datetime
update_time: datetime
class Config:
from_attributes = True
orm_mode = True
# 根路由
@app.get("/")
async def root():
return {"message": "FastAPI ORM项目运行成功", "docs_url": "/docs"}
# 查询所有书籍
@app.get("/book/all", response_model=List[BookResponse])
async def get_all_books(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Book))
return result.scalars().all()
# 根据ID查询单本书籍
@app.get("/book/{book_id}", response_model=BookResponse | dict)
async def get_book_by_id(book_id: int, db: AsyncSession = Depends(get_db)):
book = await db.get(Book, book_id)
if not book:
return {"code": 404, "message": "书籍不存在"}
return book
# 条件查询:价格小于指定值 + 作者模糊查询
@app.get("/book/search/condition", response_model=List[BookResponse])
async def search_book(price: float, author: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Book).where(Book.price < price).where(Book.author.like(f"%{author}%"))
)
return result.scalars().all()
# 新增书籍
@app.post("/book/add", response_model=BookResponse)
async def add_book(book: BookBase, db: AsyncSession = Depends(get_db)):
book_obj = Book(**book.__dict__)
db.add(book_obj)
await db.commit()
return book_obj
# 更新书籍信息
@app.put("/book/update/{book_id}", response_model=BookResponse)
async def update_book(book_id: int, book: BookBase, db: AsyncSession = Depends(get_db)):
db_book = await db.get(Book, book_id)
if not db_book:
raise HTTPException(status_code=404, detail="书籍不存在")
db_book.name = book.name
db_book.author = book.author
db_book.price = book.price
db_book.publisher = book.publisher
await db.flush()
await db.refresh(db_book)
return db_book
# 删除书籍
@app.delete("/book/delete/{book_id}")
async def delete_book(book_id: int, db: AsyncSession = Depends(get_db)):
db_book = await db.get(Book, book_id)
if not db_book:
raise HTTPException(status_code=404, detail="书籍不存在")
await db.delete(db_book)
await db.commit()
return {"code": 200, "message": "书籍删除成功"}
SQLAlchemy 异步版操作数据库的核心流程可概括为3 步建环境,4 步做操作:
建环境(项目初始化)
- 安装依赖:sqlalchemy[asyncio] + 对应数据库的异步驱动;
- 创建异步引擎:create_async_engine,配置连接池和日志;
- 定义模型类:继承DeclarativeBase,实现表与对象的映射;
数据操作(CRUD)
- 创建会话工厂:async_sessionmaker,绑定引擎;
- 定义会话依赖项:get_db(),统一管理会话的创建和释放;
- 路由中注入会话:db: AsyncSession = Depends(get_db);
- 操作对象:通过select()/add()/delete() + 对象属性赋值实现 CRUD。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)