目录

2.3.1ORM 简介

2.3.2ORM 分类-ORM 使用流程

2.3.3ORM - 建表

2.3.3.1创建数据库引擎

2.3.3.2定义模型类

2.3.3.3创建数据库表

2.3.3.4完整的代码如下:

2.3.3.5替换后的代码

2.3.3.6练习

2.3.4路由匹配中使用 ORM

2.3.4.1创建方式如下:

2.3.4.2直接查询book表中的数据

2.3.5ORM数据库操作

2.3.5.1查询

2.3.5.2获取单条数据:scalars().first() ;get(模型类, 主键值)

2.3.5.3先插入几条书籍的数据

2.3.5.4更改这个方法get_book_list

2.3.5.5查询全部数据、查询第一条数据

2.3.5.6查询指定的图书

2.3.5.7查询条件

2.3.5.7.1比较判断

2.3.5.7.2模糊查询

2.3.5.7.3与非查询

2.3.5.7.4包含查询

2.3.5.7.5聚合查询

2.3.5.7.6分页查询

2.3.5.8小结

2.3.5.8.1新增

2.3.5.8.2更新

2.3.5.8.3删除


2.3.1ORM 简介

ORM(Object-RelationalMapping,对象关系映射)是一种编程技术,用于在面向对象编程语言关系型数据库之间建立映射。它允许开发者通过操作对象的方式与数据库进行交互,而无需直接编写复杂的SQL语句。

优势:

  • 减少重复的 SQL 代码
  • 代码更简洁易读
  • 自动处理数据库连接和事务
  • 自动防止 SQL 注入攻击
2.3.2ORM 分类-ORM 使用流程

Windows安装方式

pip install sqlalchemy aiomysql

mac安装方式

pip install "sqlalchemy[asyncio]" aiomysql
2.3.3ORM - 建表

建表流程,在建表之前还需要建一个数据库create database xx;

2.3.3.1创建数据库引擎

使用 create_async_engine 创建异步引擎

from sqlalchemy.ext.asyncio import create_async_engine

ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_test?charset=utf8"
# 创建异步引擎
async_engine = create_async_engine(
    ASYNC_DATABASE_URL,
    echo=True,       # 可选: 输出SQL日志
    pool_size=10,    # 设置连接池中保持的持久连接数
    max_overflow=20  # 设置连接池允许创建的额外连接数
)

先使用创建数据库语句创建一个数据库

create database fastapi_first;
2.3.3.2定义模型类

基类,继承 DeclarativeBase(包含通用属性和字段的映射)

定义数据库表对应的模型类

class Base(DeclarativeBase):
    create_time: Mapped[datetime] = mapped_column(
        DateTime, insert_default=func.now(), default=datetime.now, comment="创建时间")
    update_time: Mapped[datetime] = mapped_column(
        DateTime, insert_default=func.now(), onupdate=func.now(), default=datetime.now, comment="修改时间")


class Book(Base):
    __tablename__ = "book"

    id: Mapped[int] = mapped_column(primary_key=True)
    bookname: Mapped[str] = mapped_column(String(255))
    author: Mapped[str] = mapped_column(String(255))
    ......
2.3.3.3创建数据库表

从连接池获取异步连接,开启事务,执行 ORM 操作

FastAPI 应用启动时,创建数据库表

async def create_tables():
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)


@app.on_event("startup")
async def startup_event():
    await create_tables()
2.3.3.4完整的代码如下:
from datetime import datetime

from fastapi import FastAPI
from sqlalchemy import DateTime, func, String, Float
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

app = FastAPI()

# 1、创建异步引擎
ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_first?charset=utf8"
async_engine = create_async_engine(
    ASYNC_DATABASE_URL,
    echo=True,  # 可选,输出SQL日志
    pool_size=10,  # 设置连接池活跃的连接数
    max_overflow=20  # 允许额外的连接数
)


# 2、定义模型类: 基类 + 表对应的模型类
# 基类:创建时间、更新时间;书籍表:id、书名、作者、价格、出版社
class Base(DeclarativeBase):
    create_time: Mapped[datetime] = mapped_column(DateTime,insert_default=func.now(),default=func.now,comment="创建时间")
    update_time: Mapped[datetime] = mapped_column(DateTime,insert_default=func.now(),default=func.now,onupdate=func.now(),comment="更新时间")


class Book(Base):
    __tablename__ = "book"
    id: Mapped[int] = mapped_column(primary_key=True,comment="书籍id")
    bookname: Mapped[str] = mapped_column(String(255),comment="书名")
    author: Mapped[str] = mapped_column(String(255),comment="作者")
    price: Mapped[float] = mapped_column(Float,comment="价格")
    publisher: Mapped[str] = mapped_column(String(255),comment="出版社")

# 3、建表:定义函数建表->FastAPI启动时调用建表的函数
async def create_tables():
    # 获取异步引擎,创建事务-建表
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)  # Base 模型类的元数据创建

@app.on_event("startup")
async def startup_event():
    await create_tables()

@app.get("/")
async def root():
    return {"message": "Hello World"}

接着启动FastAPI程序

然后登录数据库,查看表是否成功创建。

注意:因为这里的on_event 在 FastAPI 的新版本中已经被弃用了。应该使用 lifespan 事件处理器来替代。

2.3.3.5替换后的代码
from datetime import datetime
from contextlib import asynccontextmanager

from fastapi import FastAPI
from sqlalchemy import DateTime, func, String, Float
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column


# 创建 FastAPI 应用实例,使用 lifespan 管理应用生命周期
@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    应用生命周期管理器
    - yield 之前:应用启动时执行(如:数据库初始化、连接池创建等)
    - yield 之后:应用关闭时执行(如:关闭数据库连接、清理资源等)
    """
    # 启动时创建数据库表
    await create_tables()
    yield
    # 如果有关闭时需要执行的清理操作,可以在这里添加


app = FastAPI(lifespan=lifespan)

# 1、创建异步引擎
ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_first?charset=utf8"
async_engine = create_async_engine(
    ASYNC_DATABASE_URL,
    echo=True,  # 可选,输出SQL日志
    pool_size=10,  # 设置连接池活跃的连接数
    max_overflow=20  # 允许额外的连接数
)


# 2、定义模型类: 基类 + 表对应的模型类
# 基类:创建时间、更新时间;书籍表:id、书名、作者、价格、出版社
class Base(DeclarativeBase):
    # 创建时间字段:插入时自动设置为当前时间,默认值为当前时间
    create_time: Mapped[datetime] = mapped_column(DateTime, insert_default=func.now(), default=func.now,
                                                  comment="创建时间")
    # 更新时间字段:插入和更新时自动设置为当前时间
    update_time: Mapped[datetime] = mapped_column(DateTime, insert_default=func.now(), default=func.now,
                                                  onupdate=func.now(), comment="更新时间")


class Book(Base):
    """书籍表模型类"""
    __tablename__ = "book"  # 数据库表名
    id: Mapped[int] = mapped_column(primary_key=True, comment="书籍id")  # 主键
    bookname: Mapped[str] = mapped_column(String(255), comment="书名")  # 书名
    author: Mapped[str] = mapped_column(String(255), comment="作者")  # 作者
    price: Mapped[float] = mapped_column(Float, comment="价格")  # 价格
    publisher: Mapped[str] = mapped_column(String(255), comment="出版社")  # 出版社


# 3、建表:定义函数建表->FastAPI启动时调用建表的函数
async def create_tables():
    """
    创建数据库表
    使用异步引擎创建所有继承自 Base 的模型类对应的表
    """
    # 获取异步引擎,创建事务-建表
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)  # Base 模型类的元数据创建


@app.get("/")
async def root():
    """根路径接口"""
    return {"message": "Hello World"}

再次执行这个程序,然后查看是否成功创建

2.3.3.6练习

需求:使用 SQLAlchemy ORM 创建用户表,包含字段如下:用户 id、用户名、密码、创建时间、更新时间

  • 用户 id:主键
from datetime import datetime
from contextlib import asynccontextmanager

from fastapi import FastAPI
from sqlalchemy import DateTime, func, String
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column


# 创建 FastAPI 应用实例,使用 lifespan 管理应用生命周期
@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    应用生命周期管理器
    - yield 之前:应用启动时执行(如:数据库初始化、连接池创建等)
    - yield 之后:应用关闭时执行(如:关闭数据库连接、清理资源等)
    """
    # 启动时创建数据库表
    await create_tables()
    yield
    # 如果有关闭时需要执行的清理操作,可以在这里添加


app = FastAPI(lifespan=lifespan)

# 1、创建异步引擎
ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_first?charset=utf8"
async_engine = create_async_engine(
    ASYNC_DATABASE_URL,
    echo=True,  # 可选,输出SQL日志
    pool_size=10,  # 设置连接池活跃的连接数
    max_overflow=20  # 允许额外的连接数
)


# 2、定义模型类: 基类 + 表对应的模型类
# 基类:创建时间、更新时间;用户表:id、用户名、密码
class Base(DeclarativeBase):
    # 创建时间字段:插入时自动设置为当前时间,默认值为当前时间
    create_time: Mapped[datetime] = mapped_column(DateTime, insert_default=func.now(), default=func.now,
                                                  comment="创建时间")
    # 更新时间字段:插入和更新时自动设置为当前时间
    update_time: Mapped[datetime] = mapped_column(DateTime, insert_default=func.now(), default=func.now,
                                                  onupdate=func.now(), comment="更新时间")


class User(Base):
    """用户表模型类"""
    __tablename__ = "user"  # 数据库表名
    id: Mapped[int] = mapped_column(primary_key=True, comment="用户id")  # 主键
    username: Mapped[str] = mapped_column(String(50), unique=True, nullable=False, comment="用户名")  # 用户名
    password: Mapped[str] = mapped_column(String(255), nullable=False, comment="密码")  # 密码


# 3、建表:定义函数建表->FastAPI启动时调用建表的函数
async def create_tables():
    """
    创建数据库表
    使用异步引擎创建所有继承自 Base 的模型类对应的表
    """
    # 获取异步引擎,创建事务-建表
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)  # Base 模型类的元数据创建


@app.get("/")
async def root():
    """根路径接口"""
    return {"message": "Hello World"}
2.3.4路由匹配中使用 ORM

我们有一个搜索的功能,用户点击搜索的时候,完成的是数据库查询的工作。

那么当我们点击注册的功能时,需要在数据库表中新出入一条数据。

那么在一个项目当时都是通过一个接口来实现数据库的增删改查的。接下来就用到了路由匹配中使用ORM了

核心:创建依赖项,使用 Depends 注入到处理函数

2.3.4.1创建方式如下:
# 创建异步会话工厂
AsyncSessionLocal = async_sessionmaker(
    bind=async_engine,    # 绑定数据库引擎
    class_=AsyncSession,  # 指定会话类
    expire_on_commit=False # 会话对象不过期,不重新查询数据库
)

# 依赖项,用于获取数据库会话
async def get_database():
    async with AsyncSessionLocal() as session:
        try:
            yield session  # 返回数据库会话给路由处理函数
            await session.commit()  # 无异常,提交事务
        except Exception:
            await session.rollback()  # 有异常则回滚
            raise
        finally:
            await session.close()  # 关闭会话


@app.get("/book/books")
async def get_book_list(
    db: AsyncSession = Depends(get_database)
):
    # 查询所有书籍
    result = await db.execute(select(Book))  # Book 模型类
    user = result.scalars().all()
    return user
2.3.4.2直接查询book表中的数据
from datetime import datetime
from contextlib import asynccontextmanager

from fastapi import FastAPI, Depends
from sqlalchemy import DateTime, func, String, Float, select
from sqlalchemy.ext.asyncio import create_async_engine,async_sessionmaker,AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column


# 创建 FastAPI 应用实例,使用 lifespan 管理应用生命周期
@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    应用生命周期管理器
    - yield 之前:应用启动时执行(如:数据库初始化、连接池创建等)
    - yield 之后:应用关闭时执行(如:关闭数据库连接、清理资源等)
    """
    # 启动时创建数据库表
    await create_tables()
    yield
    # 如果有关闭时需要执行的清理操作,可以在这里添加


app = FastAPI(lifespan=lifespan)

# 1、创建异步引擎
ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_first?charset=utf8"
async_engine = create_async_engine(
    ASYNC_DATABASE_URL,
    echo=True,  # 可选,输出SQL日志
    pool_size=10,  # 设置连接池活跃的连接数
    max_overflow=20  # 允许额外的连接数
)


# 2、定义模型类: 基类 + 表对应的模型类
# 基类:创建时间、更新时间;书籍表:id、书名、作者、价格、出版社
class Base(DeclarativeBase):
    # 创建时间字段:插入时自动设置为当前时间,默认值为当前时间
    create_time: Mapped[datetime] = mapped_column(DateTime, insert_default=func.now(), default=func.now,
                                                  comment="创建时间")
    # 更新时间字段:插入和更新时自动设置为当前时间
    update_time: Mapped[datetime] = mapped_column(DateTime, insert_default=func.now(), default=func.now,
                                                  onupdate=func.now(), comment="更新时间")


class Book(Base):
    """书籍表模型类"""
    __tablename__ = "book"  # 数据库表名
    id: Mapped[int] = mapped_column(primary_key=True, comment="书籍id")  # 主键
    bookname: Mapped[str] = mapped_column(String(255), comment="书名")  # 书名
    author: Mapped[str] = mapped_column(String(255), comment="作者")  # 作者
    price: Mapped[float] = mapped_column(Float, comment="价格")  # 价格
    publisher: Mapped[str] = mapped_column(String(255), comment="出版社")  # 出版社


# 3、建表:定义函数建表->FastAPI启动时调用建表的函数
async def create_tables():
    """
    创建数据库表
    使用异步引擎创建所有继承自 Base 的模型类对应的表
    """
    # 获取异步引擎,创建事务-建表
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)  # Base 模型类的元数据创建


@app.get("/")
async def root():
    """根路径接口"""
    return {"message": "Hello World"}

# 需求:查询功能的接口,查询图书->依赖注入:创建依赖项获取数据库会话 + Depends 注入路由处理函数
# 创建异步会话工厂
AsyncSessionLocal = async_sessionmaker(
    bind = async_engine,  # 绑定数据库引擎
    class_=AsyncSession,  # 指定会话类
    expire_on_commit=False  # 提交后不关闭会话,不会重新查询数据库
)

# 创建依赖项:获取数据库会话
async  def get_database():
    async  with AsyncSessionLocal() as session:
        try:
            yield session  # 返回数据会话给路由处理函数
            await session.commit() # 提交事务
        except Exception as e:
            await session.rollback()  # 有异常则回滚事务
            raise e
        finally:
            await session.close()  # 关闭会话


@app.get("/book/books")
async def get_book_list(db: AsyncSession = Depends(get_database)):
    """
    就是这里为什么传递的依赖是get_database,而不是get_database()函数呢?
    ✅ 不加括号:传递函数本身,让 FastAPI 在合适的时机调用
    ❌ 加括号:立即执行函数,传递的是返回值(这会导致错误)
    这是 FastAPI 依赖注入的核心机制,通过传递函数引用,框架可以控制何时调用、如何管理生命周期(比如处理 yield 前后的逻辑)。
    """
    # 查询
    result = await db.execute(select(Book))
    book = result.scalars().all()
    return book

这里是空的,因为我们还没有往数据库表中插入数据

在PyCharm中连接数据库,然后使用图形化工具插入一条数据

然后重新进入到接口文档,刷新,再次发起请求进行查询

2.3.5ORM数据库操作
2.3.5.1查询

核心语句:await db.execute( select(模型类) ),返回一个 ORM 对象

  • 获取所有数据:scalars().all()
@app.get("/book/get_books")
async def get_book_list(db: AsyncSession=Depends(get_database)):
    result = await db.execute(select(Book))
    book = result.scalars().all()
    return book
2.3.5.2获取单条数据:scalars().first()get(模型类, 主键值)
@app.get("/book/get_book")
async def get_book(db: AsyncSession=Depends(get_database)):
    # result = await db.execute(select(Book))
    # book=result.scalars().first()
    book = await db.get(Book, 1)
    return book
2.3.5.3先插入几条书籍的数据
INSERT INTO fastapi_first.book (bookname, author, price, publisher, create_time, update_time)
VALUES 
('活着', '余华', 39.0, '作家出版社', NOW(), NOW()),
('平凡的世界', '路遥', 78.0, '北京十月文艺出版社', NOW(), NOW()),
('百年孤独', '加西亚·马尔克斯', 56.8, '南海出版公司', NOW(), NOW()),
('解忧杂货店', '东野圭吾', 42.0, '南海出版公司', NOW(), NOW());
2.3.5.4更改这个方法get_book_list
@app.get("/book/books")
async def get_book_list(db: AsyncSession = Depends(get_database)):
    # 查询所有图书
    result = await db.execute(select(Book))
    books = result.scalars().all()
    return books
2.3.5.5查询全部数据、查询第一条数据
@app.get("/book/books")
async def get_book_list(db: AsyncSession = Depends(get_database)):
    # 查询指定的图书
    result = await db.execute(select(Book))
    # books = result.scalars().all() # 获取所有结果
    books =result.scalars().first()
    return books
2.3.5.6查询指定的图书
@app.get("/book/books")
async def get_book_list(db: AsyncSession = Depends(get_database)):
    # 查询指定的图书
    # result = await db.execute(select(Book))
    # books = result.scalars().all() # 获取所有结果
    # books =result.scalars().first()
    book = await db.get(Book,4)
    return book
2.3.5.7查询条件

select(Book).where(条件, 条件2, ...)

条件:

  • 比较判断:==; >; <; >=; <= 等
  • 模糊查询:like()
  • 与非查询:&; |; ~
  • 包含查询:in_()
2.3.5.7.1比较判断
  • 比较判断:==; >; <; >=; <= 等
# 需求:路径参数 书籍id
@app.get("/book/get_book/{book_id}")
async def get_book_list(book_id,db: AsyncSession = Depends(get_database)):
    result = await db.execute(select(Book).where(Book.id == book_id))
    book = result.scalar_one_or_none()
    return  book
# 需求:条件 价格大于等于200
@app.get("/book/get_book_price/{price}")
async def get_book_price(price,db: AsyncSession = Depends(get_database)):
    result = await  db.execute(select(Book).where(Book.price >= price))
    books = result.scalars().all()
    return books
2.3.5.7.2模糊查询

模糊查询:like()

  • %:零个、一个或多个字符
  • _:一个单个字符
# 需求:作者以 曹 开头 % _
@app.get("/book/get_book_by_author")
async def get_book_list(db: AsyncSession = Depends(get_database)):
    result = await db.execute(select(Book).where(Book.author.like("曹%")))
    book = result.scalars().all()
    return  book

@app.get("/book/get_book_by_author")
async def get_book_list(db: AsyncSession = Depends(get_database)):
    result = await db.execute(select(Book).where(Book.author.like("曹_"))) # 这个只能查询两个字并且姓曹的人
    book = result.scalars().all()
    return  book
2.3.5.7.3与非查询

与非查询:

  • &:与
  • |:或
  • ~:非
# 需求:作者以 曹 开头 % _
@app.get("/book/get_book_by_author")
async def get_book_list(db: AsyncSession = Depends(get_database)):
    # result = await db.execute(select(Book).where(Book.author.like("曹%")))
    result = await db.execute(select(Book).where(Book.author.like("曹%") & (Book.price>100)))
    result = await db.execute(select(Book).where(Book.author.like("曹%") | (Book.price>100)))
    result = await db.execute(select(Book).where(~Book.author.like("曹%")))
    book = result.scalars().all()
    return  book
2.3.5.7.4包含查询

包含查询:

  • in_()
@app.get("/book/get_book_by_author")
async def get_book_list(db: AsyncSession = Depends(get_database)):
    # result = await db.execute(select(Book).where(Book.author.like("曹%")))
    # result = await db.execute(select(Book).where(Book.author.like("曹%") & (Book.price>100)))
    # result = await db.execute(select(Book).where(Book.author.like("曹%") | (Book.price>100)))
    # result = await db.execute(select(Book).where(~Book.author.like("曹%")))

    # 需求:书籍id列表,数据库里面的id如果在 id列表里面,就返回
    id_list = [1,2,3]
    result = await db.execute(select(Book).where(Book.id.in_(id_list)))
    book = result.scalars().all()
    return  book
2.3.5.7.5聚合查询

聚合计算:func.方法(模型类.属性)

  • count:统计行数量
  • avg:求平均值
  • max:求最大值
  • min:求最小值
  • sum:求和
@app.get("/book/count")
async def get_count(db: AsyncSession = Depends(get_database)):
    # 聚合查询 select(func.方法名(模型类.属性))
    # result = await db.execute(select(func.count(Book.id)))
    # result = await db.execute(select(func.max(Book.price)))
    # result = await db.execute(select(func.sum(Book.price)))
    result = await db.execute(select(func.avg(Book.price)))
    count = result.scalar()  # 用来提取一个数值->标量值
    return count
2.3.5.7.6分页查询

分页查询:select().offset().limit()

  • offset:跳过的记录数
  • limit:返回的记录数

offset 值 = (当前页码 - 1) * 每页数量 limit

@app.get("/book/get_books")
async def get_book_list(
    page: int = 1,  # 需要查询哪一页
    page_size: int = 3,  # 每页展示多少条数据
    db: AsyncSession = Depends(get_database) 
):
    # 跳过的多少条数据
    skip = (page-1) * page_size
    # 查询【第 page 页】的那一页数据,offset 跳过的记录数;limit 每页的记录数
    stmt = select(Book).offset(skip).limit(page_size)
    result = await db.execute(stmt)
    books = result.scalars().all()
    return {"books": books}
2.3.5.8小结

核心思路:

  • select() → db.execute() → 从 ORM 对象获取数据 → 响应结果
  • db.get(模型类, 主键值)

从 ORM 对象获取数据的方式

  • 获取所有数据
    • scalars().all()
  • 获取单条数据
    • scalars().first(): 提取第一个数据
    • scalar_one_or_none(): 提取一个或 null
    • scalar(): 提取标量值(配合聚合查询使用)
2.3.5.8.1新增

核心步骤:定义 ORM 对象 → 添加对象到事务:add(对象) → commit 提交到数据库

from pydantic import BaseModel # 导入基类


# 需求:用户输入图书信息(id、书名、作者、价格、出版社)->新增
# 用书输入->参数->请求体
# 定义需要新增的数据类
"""
BookBase(继承BaseModel)是 Pydantic 请求体模型,用来接收、校验前端传参,相当于 Java 里的 DTO。
Book(继承DeclarativeBase)是 SQLAlchemy ORM 模型,对应数据库表,相当于 Java 里的 Entity。
新增时,先把请求体数据转成 ORM 对象,再通过 ORM 方法写入数据库。
"""
class BookBase(BaseModel):
    id: int
    bookname: str
    author: str
    price: float
    publisher: str

@app.post("/book/add_book")
async def add_book(book: BookBase, db: AsyncSession = Depends(get_database)):
    # ORM对象->add->commit
    # 获取 book 参数,创建图书对象(__dict__ 返回 book 对象的属性字典)
    book_obj = Book(**book.__dict__) # 先把book通过__dict__转化为字典,然后**解包,对字典进行展开
    db.add(book_obj) 
    await db.commit()
    return book
2.3.5.8.2更新

核心步骤:查询 get → 属性重新赋值 → commit 提交到数据库

from pydantic import BaseModel # 导入基类
from fastapi import FastAPI,Depends,HTTPException

# 需求:修改图书信息:先查再改
# 设计思路:路径参数书籍id:作用是查找;请求体参数:作用是新数据(书名、作者、价格、出版社)
class BookUpdate(BaseModel):
    bookname: str
    author: str
    price: float
    publisher: str

@app.put("/book/update_book/{book_id}") # 通过主键id来找,同时也需要准备一个请求体参数:BookUpdate
async def update_book(book_id: int, data: BookUpdate, db: AsyncSession = Depends(get_database)):
    # 1. 查询
    book = await db.get(Book, book_id)
    # 如果未找到,抛出异常
    if book is None:
        raise HTTPException(status_code=404, detail="Book not found")
    # 2. 修改属性(重新赋值)
    book.bookname = data.bookname
    book.author = data.author
    book.price = data.price
    book.publisher = data.publisher
    # 3. 提交
    await db.commit()
    return book
2.3.5.8.3删除

核心步骤:查询 get → delete 删除 → commit 提交到数据库

@app.delete("/book/delete_book/{book_id}")
async def delete_book(book_id: int, db: AsyncSession = Depends(get_database)):
    # 先查再删 提交
    db_book = await db.get(Book, book_id)
    # 如果查不到,抛出异常
    if db_book is None:
        raise HTTPException(status_code=404, detail="Book not found")
    # 查到谁就删除谁
    await db.delete(db_book)
    await db.commit()
    return {"message": "Book deleted"}
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐