7天掌握FastAPI-ORM
目录
2.3.5.2获取单条数据:scalars().first() ;get(模型类, 主键值)
2.3.1ORM 简介
ORM(Object-RelationalMapping,对象关系映射)是一种编程技术,用于在面向对象编程语言和关系型数据库之间建立映射。它允许开发者通过操作对象的方式与数据库进行交互,而无需直接编写复杂的SQL语句。
优势:
- 减少重复的 SQL 代码
- 代码更简洁易读
- 自动处理数据库连接和事务
- 自动防止 SQL 注入攻击
2.3.2ORM 分类-ORM 使用流程
Windows安装方式
pip install sqlalchemy aiomysql
mac安装方式
pip install "sqlalchemy[asyncio]" aiomysql
2.3.3ORM - 建表
建表流程,在建表之前还需要建一个数据库create database xx;
2.3.3.1创建数据库引擎
使用 create_async_engine 创建异步引擎
from sqlalchemy.ext.asyncio import create_async_engine
ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_test?charset=utf8"
# 创建异步引擎
async_engine = create_async_engine(
ASYNC_DATABASE_URL,
echo=True, # 可选: 输出SQL日志
pool_size=10, # 设置连接池中保持的持久连接数
max_overflow=20 # 设置连接池允许创建的额外连接数
)
先使用创建数据库语句创建一个数据库
create database fastapi_first;
2.3.3.2定义模型类
基类,继承 DeclarativeBase(包含通用属性和字段的映射)
定义数据库表对应的模型类
class Base(DeclarativeBase):
create_time: Mapped[datetime] = mapped_column(
DateTime, insert_default=func.now(), default=datetime.now, comment="创建时间")
update_time: Mapped[datetime] = mapped_column(
DateTime, insert_default=func.now(), onupdate=func.now(), default=datetime.now, comment="修改时间")
class Book(Base):
__tablename__ = "book"
id: Mapped[int] = mapped_column(primary_key=True)
bookname: Mapped[str] = mapped_column(String(255))
author: Mapped[str] = mapped_column(String(255))
......
2.3.3.3创建数据库表
从连接池获取异步连接,开启事务,执行 ORM 操作
FastAPI 应用启动时,创建数据库表
async def create_tables():
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
@app.on_event("startup")
async def startup_event():
await create_tables()
2.3.3.4完整的代码如下:
from datetime import datetime
from fastapi import FastAPI
from sqlalchemy import DateTime, func, String, Float
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
app = FastAPI()
# 1、创建异步引擎
ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_first?charset=utf8"
async_engine = create_async_engine(
ASYNC_DATABASE_URL,
echo=True, # 可选,输出SQL日志
pool_size=10, # 设置连接池活跃的连接数
max_overflow=20 # 允许额外的连接数
)
# 2、定义模型类: 基类 + 表对应的模型类
# 基类:创建时间、更新时间;书籍表:id、书名、作者、价格、出版社
class Base(DeclarativeBase):
create_time: Mapped[datetime] = mapped_column(DateTime,insert_default=func.now(),default=func.now,comment="创建时间")
update_time: Mapped[datetime] = mapped_column(DateTime,insert_default=func.now(),default=func.now,onupdate=func.now(),comment="更新时间")
class Book(Base):
__tablename__ = "book"
id: Mapped[int] = mapped_column(primary_key=True,comment="书籍id")
bookname: Mapped[str] = mapped_column(String(255),comment="书名")
author: Mapped[str] = mapped_column(String(255),comment="作者")
price: Mapped[float] = mapped_column(Float,comment="价格")
publisher: Mapped[str] = mapped_column(String(255),comment="出版社")
# 3、建表:定义函数建表->FastAPI启动时调用建表的函数
async def create_tables():
# 获取异步引擎,创建事务-建表
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all) # Base 模型类的元数据创建
@app.on_event("startup")
async def startup_event():
await create_tables()
@app.get("/")
async def root():
return {"message": "Hello World"}
接着启动FastAPI程序
然后登录数据库,查看表是否成功创建。
注意:因为这里的on_event 在 FastAPI 的新版本中已经被弃用了。应该使用 lifespan 事件处理器来替代。
2.3.3.5替换后的代码
from datetime import datetime
from contextlib import asynccontextmanager
from fastapi import FastAPI
from sqlalchemy import DateTime, func, String, Float
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
# 创建 FastAPI 应用实例,使用 lifespan 管理应用生命周期
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
应用生命周期管理器
- yield 之前:应用启动时执行(如:数据库初始化、连接池创建等)
- yield 之后:应用关闭时执行(如:关闭数据库连接、清理资源等)
"""
# 启动时创建数据库表
await create_tables()
yield
# 如果有关闭时需要执行的清理操作,可以在这里添加
app = FastAPI(lifespan=lifespan)
# 1、创建异步引擎
ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_first?charset=utf8"
async_engine = create_async_engine(
ASYNC_DATABASE_URL,
echo=True, # 可选,输出SQL日志
pool_size=10, # 设置连接池活跃的连接数
max_overflow=20 # 允许额外的连接数
)
# 2、定义模型类: 基类 + 表对应的模型类
# 基类:创建时间、更新时间;书籍表:id、书名、作者、价格、出版社
class Base(DeclarativeBase):
# 创建时间字段:插入时自动设置为当前时间,默认值为当前时间
create_time: Mapped[datetime] = mapped_column(DateTime, insert_default=func.now(), default=func.now,
comment="创建时间")
# 更新时间字段:插入和更新时自动设置为当前时间
update_time: Mapped[datetime] = mapped_column(DateTime, insert_default=func.now(), default=func.now,
onupdate=func.now(), comment="更新时间")
class Book(Base):
"""书籍表模型类"""
__tablename__ = "book" # 数据库表名
id: Mapped[int] = mapped_column(primary_key=True, comment="书籍id") # 主键
bookname: Mapped[str] = mapped_column(String(255), comment="书名") # 书名
author: Mapped[str] = mapped_column(String(255), comment="作者") # 作者
price: Mapped[float] = mapped_column(Float, comment="价格") # 价格
publisher: Mapped[str] = mapped_column(String(255), comment="出版社") # 出版社
# 3、建表:定义函数建表->FastAPI启动时调用建表的函数
async def create_tables():
"""
创建数据库表
使用异步引擎创建所有继承自 Base 的模型类对应的表
"""
# 获取异步引擎,创建事务-建表
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all) # Base 模型类的元数据创建
@app.get("/")
async def root():
"""根路径接口"""
return {"message": "Hello World"}
再次执行这个程序,然后查看是否成功创建
2.3.3.6练习
需求:使用 SQLAlchemy ORM 创建用户表,包含字段如下:用户 id、用户名、密码、创建时间、更新时间
- 用户 id:主键
from datetime import datetime
from contextlib import asynccontextmanager
from fastapi import FastAPI
from sqlalchemy import DateTime, func, String
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
# 创建 FastAPI 应用实例,使用 lifespan 管理应用生命周期
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
应用生命周期管理器
- yield 之前:应用启动时执行(如:数据库初始化、连接池创建等)
- yield 之后:应用关闭时执行(如:关闭数据库连接、清理资源等)
"""
# 启动时创建数据库表
await create_tables()
yield
# 如果有关闭时需要执行的清理操作,可以在这里添加
app = FastAPI(lifespan=lifespan)
# 1、创建异步引擎
ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_first?charset=utf8"
async_engine = create_async_engine(
ASYNC_DATABASE_URL,
echo=True, # 可选,输出SQL日志
pool_size=10, # 设置连接池活跃的连接数
max_overflow=20 # 允许额外的连接数
)
# 2、定义模型类: 基类 + 表对应的模型类
# 基类:创建时间、更新时间;用户表:id、用户名、密码
class Base(DeclarativeBase):
# 创建时间字段:插入时自动设置为当前时间,默认值为当前时间
create_time: Mapped[datetime] = mapped_column(DateTime, insert_default=func.now(), default=func.now,
comment="创建时间")
# 更新时间字段:插入和更新时自动设置为当前时间
update_time: Mapped[datetime] = mapped_column(DateTime, insert_default=func.now(), default=func.now,
onupdate=func.now(), comment="更新时间")
class User(Base):
"""用户表模型类"""
__tablename__ = "user" # 数据库表名
id: Mapped[int] = mapped_column(primary_key=True, comment="用户id") # 主键
username: Mapped[str] = mapped_column(String(50), unique=True, nullable=False, comment="用户名") # 用户名
password: Mapped[str] = mapped_column(String(255), nullable=False, comment="密码") # 密码
# 3、建表:定义函数建表->FastAPI启动时调用建表的函数
async def create_tables():
"""
创建数据库表
使用异步引擎创建所有继承自 Base 的模型类对应的表
"""
# 获取异步引擎,创建事务-建表
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all) # Base 模型类的元数据创建
@app.get("/")
async def root():
"""根路径接口"""
return {"message": "Hello World"}
2.3.4路由匹配中使用 ORM
我们有一个搜索的功能,用户点击搜索的时候,完成的是数据库查询的工作。
那么当我们点击注册的功能时,需要在数据库表中新出入一条数据。
那么在一个项目当时都是通过一个接口来实现数据库的增删改查的。接下来就用到了路由匹配中使用ORM了
核心:创建依赖项,使用 Depends 注入到处理函数
2.3.4.1创建方式如下:
# 创建异步会话工厂
AsyncSessionLocal = async_sessionmaker(
bind=async_engine, # 绑定数据库引擎
class_=AsyncSession, # 指定会话类
expire_on_commit=False # 会话对象不过期,不重新查询数据库
)
# 依赖项,用于获取数据库会话
async def get_database():
async with AsyncSessionLocal() as session:
try:
yield session # 返回数据库会话给路由处理函数
await session.commit() # 无异常,提交事务
except Exception:
await session.rollback() # 有异常则回滚
raise
finally:
await session.close() # 关闭会话
@app.get("/book/books")
async def get_book_list(
db: AsyncSession = Depends(get_database)
):
# 查询所有书籍
result = await db.execute(select(Book)) # Book 模型类
user = result.scalars().all()
return user
2.3.4.2直接查询book表中的数据
from datetime import datetime
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends
from sqlalchemy import DateTime, func, String, Float, select
from sqlalchemy.ext.asyncio import create_async_engine,async_sessionmaker,AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
# 创建 FastAPI 应用实例,使用 lifespan 管理应用生命周期
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
应用生命周期管理器
- yield 之前:应用启动时执行(如:数据库初始化、连接池创建等)
- yield 之后:应用关闭时执行(如:关闭数据库连接、清理资源等)
"""
# 启动时创建数据库表
await create_tables()
yield
# 如果有关闭时需要执行的清理操作,可以在这里添加
app = FastAPI(lifespan=lifespan)
# 1、创建异步引擎
ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_first?charset=utf8"
async_engine = create_async_engine(
ASYNC_DATABASE_URL,
echo=True, # 可选,输出SQL日志
pool_size=10, # 设置连接池活跃的连接数
max_overflow=20 # 允许额外的连接数
)
# 2、定义模型类: 基类 + 表对应的模型类
# 基类:创建时间、更新时间;书籍表:id、书名、作者、价格、出版社
class Base(DeclarativeBase):
# 创建时间字段:插入时自动设置为当前时间,默认值为当前时间
create_time: Mapped[datetime] = mapped_column(DateTime, insert_default=func.now(), default=func.now,
comment="创建时间")
# 更新时间字段:插入和更新时自动设置为当前时间
update_time: Mapped[datetime] = mapped_column(DateTime, insert_default=func.now(), default=func.now,
onupdate=func.now(), comment="更新时间")
class Book(Base):
"""书籍表模型类"""
__tablename__ = "book" # 数据库表名
id: Mapped[int] = mapped_column(primary_key=True, comment="书籍id") # 主键
bookname: Mapped[str] = mapped_column(String(255), comment="书名") # 书名
author: Mapped[str] = mapped_column(String(255), comment="作者") # 作者
price: Mapped[float] = mapped_column(Float, comment="价格") # 价格
publisher: Mapped[str] = mapped_column(String(255), comment="出版社") # 出版社
# 3、建表:定义函数建表->FastAPI启动时调用建表的函数
async def create_tables():
"""
创建数据库表
使用异步引擎创建所有继承自 Base 的模型类对应的表
"""
# 获取异步引擎,创建事务-建表
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all) # Base 模型类的元数据创建
@app.get("/")
async def root():
"""根路径接口"""
return {"message": "Hello World"}
# 需求:查询功能的接口,查询图书->依赖注入:创建依赖项获取数据库会话 + Depends 注入路由处理函数
# 创建异步会话工厂
AsyncSessionLocal = async_sessionmaker(
bind = async_engine, # 绑定数据库引擎
class_=AsyncSession, # 指定会话类
expire_on_commit=False # 提交后不关闭会话,不会重新查询数据库
)
# 创建依赖项:获取数据库会话
async def get_database():
async with AsyncSessionLocal() as session:
try:
yield session # 返回数据会话给路由处理函数
await session.commit() # 提交事务
except Exception as e:
await session.rollback() # 有异常则回滚事务
raise e
finally:
await session.close() # 关闭会话
@app.get("/book/books")
async def get_book_list(db: AsyncSession = Depends(get_database)):
"""
就是这里为什么传递的依赖是get_database,而不是get_database()函数呢?
✅ 不加括号:传递函数本身,让 FastAPI 在合适的时机调用
❌ 加括号:立即执行函数,传递的是返回值(这会导致错误)
这是 FastAPI 依赖注入的核心机制,通过传递函数引用,框架可以控制何时调用、如何管理生命周期(比如处理 yield 前后的逻辑)。
"""
# 查询
result = await db.execute(select(Book))
book = result.scalars().all()
return book
这里是空的,因为我们还没有往数据库表中插入数据
在PyCharm中连接数据库,然后使用图形化工具插入一条数据
然后重新进入到接口文档,刷新,再次发起请求进行查询
2.3.5ORM数据库操作
2.3.5.1查询
核心语句:await db.execute( select(模型类) ),返回一个 ORM 对象
- 获取所有数据:
scalars().all()
@app.get("/book/get_books")
async def get_book_list(db: AsyncSession=Depends(get_database)):
result = await db.execute(select(Book))
book = result.scalars().all()
return book
2.3.5.2获取单条数据:scalars().first() ;get(模型类, 主键值)
@app.get("/book/get_book")
async def get_book(db: AsyncSession=Depends(get_database)):
# result = await db.execute(select(Book))
# book=result.scalars().first()
book = await db.get(Book, 1)
return book
2.3.5.3先插入几条书籍的数据
INSERT INTO fastapi_first.book (bookname, author, price, publisher, create_time, update_time)
VALUES
('活着', '余华', 39.0, '作家出版社', NOW(), NOW()),
('平凡的世界', '路遥', 78.0, '北京十月文艺出版社', NOW(), NOW()),
('百年孤独', '加西亚·马尔克斯', 56.8, '南海出版公司', NOW(), NOW()),
('解忧杂货店', '东野圭吾', 42.0, '南海出版公司', NOW(), NOW());
2.3.5.4更改这个方法get_book_list
@app.get("/book/books")
async def get_book_list(db: AsyncSession = Depends(get_database)):
# 查询所有图书
result = await db.execute(select(Book))
books = result.scalars().all()
return books
2.3.5.5查询全部数据、查询第一条数据
@app.get("/book/books")
async def get_book_list(db: AsyncSession = Depends(get_database)):
# 查询指定的图书
result = await db.execute(select(Book))
# books = result.scalars().all() # 获取所有结果
books =result.scalars().first()
return books
2.3.5.6查询指定的图书
@app.get("/book/books")
async def get_book_list(db: AsyncSession = Depends(get_database)):
# 查询指定的图书
# result = await db.execute(select(Book))
# books = result.scalars().all() # 获取所有结果
# books =result.scalars().first()
book = await db.get(Book,4)
return book
2.3.5.7查询条件
select(Book).where(条件, 条件2, ...)
条件:
- 比较判断:==; >; <; >=; <= 等
- 模糊查询:like()
- 与非查询:&; |; ~
- 包含查询:in_()
2.3.5.7.1比较判断
- 比较判断:==; >; <; >=; <= 等
# 需求:路径参数 书籍id
@app.get("/book/get_book/{book_id}")
async def get_book_list(book_id,db: AsyncSession = Depends(get_database)):
result = await db.execute(select(Book).where(Book.id == book_id))
book = result.scalar_one_or_none()
return book
# 需求:条件 价格大于等于200
@app.get("/book/get_book_price/{price}")
async def get_book_price(price,db: AsyncSession = Depends(get_database)):
result = await db.execute(select(Book).where(Book.price >= price))
books = result.scalars().all()
return books
2.3.5.7.2模糊查询
模糊查询:like()
- %:零个、一个或多个字符
- _:一个单个字符
# 需求:作者以 曹 开头 % _
@app.get("/book/get_book_by_author")
async def get_book_list(db: AsyncSession = Depends(get_database)):
result = await db.execute(select(Book).where(Book.author.like("曹%")))
book = result.scalars().all()
return book
@app.get("/book/get_book_by_author")
async def get_book_list(db: AsyncSession = Depends(get_database)):
result = await db.execute(select(Book).where(Book.author.like("曹_"))) # 这个只能查询两个字并且姓曹的人
book = result.scalars().all()
return book
2.3.5.7.3与非查询
与非查询:
- &:与
- |:或
- ~:非
# 需求:作者以 曹 开头 % _
@app.get("/book/get_book_by_author")
async def get_book_list(db: AsyncSession = Depends(get_database)):
# result = await db.execute(select(Book).where(Book.author.like("曹%")))
result = await db.execute(select(Book).where(Book.author.like("曹%") & (Book.price>100)))
result = await db.execute(select(Book).where(Book.author.like("曹%") | (Book.price>100)))
result = await db.execute(select(Book).where(~Book.author.like("曹%")))
book = result.scalars().all()
return book
2.3.5.7.4包含查询
包含查询:
- in_()
@app.get("/book/get_book_by_author")
async def get_book_list(db: AsyncSession = Depends(get_database)):
# result = await db.execute(select(Book).where(Book.author.like("曹%")))
# result = await db.execute(select(Book).where(Book.author.like("曹%") & (Book.price>100)))
# result = await db.execute(select(Book).where(Book.author.like("曹%") | (Book.price>100)))
# result = await db.execute(select(Book).where(~Book.author.like("曹%")))
# 需求:书籍id列表,数据库里面的id如果在 id列表里面,就返回
id_list = [1,2,3]
result = await db.execute(select(Book).where(Book.id.in_(id_list)))
book = result.scalars().all()
return book
2.3.5.7.5聚合查询
聚合计算:func.方法(模型类.属性)
- count:统计行数量
- avg:求平均值
- max:求最大值
- min:求最小值
- sum:求和
@app.get("/book/count")
async def get_count(db: AsyncSession = Depends(get_database)):
# 聚合查询 select(func.方法名(模型类.属性))
# result = await db.execute(select(func.count(Book.id)))
# result = await db.execute(select(func.max(Book.price)))
# result = await db.execute(select(func.sum(Book.price)))
result = await db.execute(select(func.avg(Book.price)))
count = result.scalar() # 用来提取一个数值->标量值
return count
2.3.5.7.6分页查询
分页查询:select().offset().limit()
- offset:跳过的记录数
- limit:返回的记录数
offset 值 = (当前页码 - 1) * 每页数量 limit
@app.get("/book/get_books")
async def get_book_list(
page: int = 1, # 需要查询哪一页
page_size: int = 3, # 每页展示多少条数据
db: AsyncSession = Depends(get_database)
):
# 跳过的多少条数据
skip = (page-1) * page_size
# 查询【第 page 页】的那一页数据,offset 跳过的记录数;limit 每页的记录数
stmt = select(Book).offset(skip).limit(page_size)
result = await db.execute(stmt)
books = result.scalars().all()
return {"books": books}
2.3.5.8小结
核心思路:
- select() → db.execute() → 从 ORM 对象获取数据 → 响应结果
- db.get(模型类, 主键值)
从 ORM 对象获取数据的方式
- 获取所有数据
-
- scalars().all()
- 获取单条数据
-
- scalars().first(): 提取第一个数据
- scalar_one_or_none(): 提取一个或 null
- scalar(): 提取标量值(配合聚合查询使用)
2.3.5.8.1新增
核心步骤:定义 ORM 对象 → 添加对象到事务:add(对象) → commit 提交到数据库
from pydantic import BaseModel # 导入基类
# 需求:用户输入图书信息(id、书名、作者、价格、出版社)->新增
# 用书输入->参数->请求体
# 定义需要新增的数据类
"""
BookBase(继承BaseModel)是 Pydantic 请求体模型,用来接收、校验前端传参,相当于 Java 里的 DTO。
Book(继承DeclarativeBase)是 SQLAlchemy ORM 模型,对应数据库表,相当于 Java 里的 Entity。
新增时,先把请求体数据转成 ORM 对象,再通过 ORM 方法写入数据库。
"""
class BookBase(BaseModel):
id: int
bookname: str
author: str
price: float
publisher: str
@app.post("/book/add_book")
async def add_book(book: BookBase, db: AsyncSession = Depends(get_database)):
# ORM对象->add->commit
# 获取 book 参数,创建图书对象(__dict__ 返回 book 对象的属性字典)
book_obj = Book(**book.__dict__) # 先把book通过__dict__转化为字典,然后**解包,对字典进行展开
db.add(book_obj)
await db.commit()
return book
2.3.5.8.2更新
核心步骤:查询 get → 属性重新赋值 → commit 提交到数据库
from pydantic import BaseModel # 导入基类
from fastapi import FastAPI,Depends,HTTPException
# 需求:修改图书信息:先查再改
# 设计思路:路径参数书籍id:作用是查找;请求体参数:作用是新数据(书名、作者、价格、出版社)
class BookUpdate(BaseModel):
bookname: str
author: str
price: float
publisher: str
@app.put("/book/update_book/{book_id}") # 通过主键id来找,同时也需要准备一个请求体参数:BookUpdate
async def update_book(book_id: int, data: BookUpdate, db: AsyncSession = Depends(get_database)):
# 1. 查询
book = await db.get(Book, book_id)
# 如果未找到,抛出异常
if book is None:
raise HTTPException(status_code=404, detail="Book not found")
# 2. 修改属性(重新赋值)
book.bookname = data.bookname
book.author = data.author
book.price = data.price
book.publisher = data.publisher
# 3. 提交
await db.commit()
return book
2.3.5.8.3删除
核心步骤:查询 get → delete 删除 → commit 提交到数据库
@app.delete("/book/delete_book/{book_id}")
async def delete_book(book_id: int, db: AsyncSession = Depends(get_database)):
# 先查再删 提交
db_book = await db.get(Book, book_id)
# 如果查不到,抛出异常
if db_book is None:
raise HTTPException(status_code=404, detail="Book not found")
# 查到谁就删除谁
await db.delete(db_book)
await db.commit()
return {"message": "Book deleted"}
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)