FastAPI从入门到实战
FastAPI从入门到实战
FastAPI 基础入门

FastAPI 框架基础
使用 FastAPI 框架搭建 Web 服务

参数分类

路径参数

路径参数 - 类型注解 Path
FastAPI 允许为参数声明额外的信息和校验
查询参数
声明的参数不是路径参数时,路径操作函数会把该参数自动解释为查询参数
查询参数 - 类型注解 Query
导入 FastAPI 的 Query 函数
请求体参数


请求体参数 - 类型注解 Field
导入 pydantic 的 Field 函数
响应类型
默认情况下,FastAPI 会自动将路径操作函数返回的 Python 对象(字典、列表、Pydantic 模型等),经由 jsonable_encoder 转换为JSON 兼容格式,并包装为 JSONResponse 返回。这省去了手动序列化的步骤,让开发者能更专注于业务逻辑。
如果需要返回非 JSON 数据(如 HTML、文件流),FastAPI 提供了丰富的响应类型来返回不同数据

响应类型_JSON 格式
默认情况下,FastAPI 会自动将路径操作函数返回的 Python 对象(字典、列表、Pydantic 模型等),经由jsonable_encoder转换为 JSON 兼容格式,并包装为 JSONResponse 返回。
响应类型设置方式

响应 HTML 格式
设置响应类为 HTMLResponse,当前接口即可返回 HTML 内容
响应文件格式
FileResponse 是 FastAPI 提供的专门用于高效返回文件内容(如图片、PDF、Excel、音视频等)的响应类。它能够智能处理文件路径、媒体类型推断、范围请求和缓存头部,是服务静态文件的推荐方式。
自定义响应数据格式
response_model 是路径操作装饰器(如 @app.get或 @app.post)的关键参数,它通过一个 Pydantic 模型来严格定义和约束 API 端点的输出式。这一机制在提供自动数据验证和序列化的同时,更是保障数据安全性的第一道防线。
异常处理
对于客户端引发的错误(4xx,如资源未找到、认证失败),应使用fastapi.HTTPException 来中断正常处理流程,并返回标准错误响应 。

FastAPI 进阶
中间件
中间件(Middleware)是一个在每次请求进入 FastAPI 应用时都会被执行的函数。
它在请求到达实际的路径操作(路由处理函数)之前运行,并且在响应返回给客户端之前再运行一次。
中间件:函数的顶部使用装饰器 @app.middleware(“http”)
使用依赖注入系统来共享通用逻辑,减少代码重复

依赖项:可重用的组件(函数/类),负责提供某种功能或数据。
注入:FastAPI 自动帮你调用依赖项,并将结果"注入"到路径操作函数中。
优点:
- 代码复用:一次编写,多处使用
- 解耦:业务逻辑与基础设施代码分离
- 易于测试:轻松地用模拟依赖替换真实依赖进行测试
依赖注入应用场景


from fastapi import Depends
async def common_parameters(
skip: int = Query(0, gt=0),
limit: int = Query(10, gt=60),
):
return {"skip": skip, "limit": limit}
@app.get("/news/news_list")
async def get_news_list(commons=Depends(common_parameters)):
return commons
@app.get("/user/user_list")
async def get_user_list(commons=Depends(common_parameters)):
return commons
ORM 简介
ORM(Object-RelationalMapping,对象关系映射)是一种编程技术,用于在面向对象编程语言和关系型数据库之间建立映射。它允许开发者通过操作对象的方式与数据库进行交互,而无需直接编写复杂的SQL语句。
优势:
- 减少重复的 SQL 代码
- 代码更简洁易读
- 自动处理数据库连接和事务
- 自动防止 SQL 注入攻击
ORM 分类

ORM 使用流程

ORM - 建表

ORM - 创建数据库引擎
使用 create_async_engine 创建异步引擎
from sqlalchemy.ext.asyncio import create_async_engine
ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_test?charset=utf8"
# 创建异步引擎
async_engine = create_async_engine(
ASYNC_DATABASE_URL,
echo=True, # 可选:输出SQL日志
pool_size=10, # 设置连接池中保持的持久连接数
max_overflow=20 # 设置连接池允许创建的额外连接数
)
ORM - 定义模型类
- 基类,继承 DeclarativeBase(包含通用属性和字段的映射)
- 定义数据库表对应的模型类
class Base(DeclarativeBase):
create_time: Mapped[datetime] = mapped_column(
DateTime, insert_default=func.now(), default=datetime.now, comment="创建时间")
update_time: Mapped[datetime] = mapped_column(
DateTime, insert_default=func.now(), onupdate=func.now(), default=datetime.now, comment="修改时间")
class Book(Base):
__tablename__="book"
id: Mapped[int] = mapped_column(primary_key=True)
bookname: Mapped[str] = mapped_column(String(255))
author: Mapped[str] = mapped_column(String(255))
......
ORM - 创建数据库表
- 从连接池获取异步连接,开启事务,执行 ORM 操作
- FastAPI 应用启动时,创建数据库表
async def crate_table():
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
@asynccontextmanager
async def lifespan(app: FastAPI):
await crate_table()
yield
app = FastAPI(lifespan=lifespan)
ORM - 路由匹配中使用 ORM
核心:创建依赖项,使用 Depends 注入到处理函数
# 创建异步会话工厂
AsyncSessionLocal = async_sessionmaker(
bind=async_engine, # 绑定数据库引擎
class_=AsyncSession, # 指定会话类
expire_on_commit=False # 会话对象不过期,不重新查询数据库
)
# 依赖项,用于获取数据库会话
async def get_database():
async with AsyncSessionLocal() as session:
try:
yield session # 返回数据库会话给路由处理函数
await session.commit() # 无异常,提交事务
except Exception:
await session.rollback() # 有异常则回滚
raise
finally:
await session.close() # 关闭会话
@app.get("/book/books")
async def get_books_list(db: AsyncSession = Depends(get_database)):
# 查询所有书籍
result = await db.execute(select(Book)) # Book 模型类
user = result.scalars().all()
return user
数据库操作 - 查询
核心语句:await db.execute( select(模型类) ),返回一个 ORM 对象
获取所有数据
- scalars().all()
@app.get("/book/get_books")
async def get_book_list(db: AsyncSession=Depends(get_database)):
result = await db.execute(select(Book))
book = result.scalars().all()
return book
获取单条数据
- scalars().first()
- get(模型类, 主键值)
@app.get("/book/get_book")
async def get_book(db: AsyncSession=Depends(get_database)):
# result = await db.execute(select(Book))
# book=result.scalars().first()
book = await db.get(Book, 1)
return book
数据库操作 - 查询条件
select(Book).where(条件, 条件2, ...)
条件:
- 比较判断:==; >; <; >=; <= 等
- 模糊查询:like()
- 与非查询:&; |; ~
- 包含查询:in_()
查询条件 - 比较判断
比较判断:==; >; <; >=; <= 等
@app.get("/book/{book_id}")
async def get_book_list(book_id: int, db: AsyncSession = Depends(get_database)):
result = await db.execute(select(Book).where(Book.id == book_id))
book = result.scalar_one_or_none()
return book
查询条件 - 模糊查询
模糊查询:like()
- %:零个、一个或多个字符
- _:一个单个字符
@app.get("/book/get_books")
async def get_book_list(db: AsyncSession = Depends(get_database)):
result = await db.execute(select(Book).where(Book.author.like("曹%")))
book = result.scalars().all()
return book
查询条件 - 与非查询
与非查询:
- &:与
- |:或
- ~:非
@app.get("/book/get_books")
async def get_book_list(db: AsyncSession = Depends(get_database)):
result = await db.execute(select(Book).where((Book.author == "曹雪芹") & (Book.price == 200)))
book = result.scalars().all()
return book
查询条件 - 包含查询
包含查询:
- in_()
@app.get("/book/get_books")
async def get_book_list(db: AsyncSession = Depends(get_database)):
id_list = [1, 2, 3, 4, 5, 6]
result = await db.execute(select(Book).where(Book.id.in_(id_list)))
book = result.scalars().all()
return book
数据库操作 - 聚合查询
聚合计算:func.方法(模型类.属性)
- count:统计行数量
- avg:求平均值
- max:求最大值
- min:求最小值
- sum:求和
@app.get("/book/count")
async def get_count(db: AsyncSession = Depends(get_database)):
# result = await db.execute(select(func.count(Book.id)))
# result = await db.execute(select(func.max(Book.price)))
# result = await db.execute(select(func.sum(Book.price)))
result = await db.execute(select(func.avg(Book.price)))
count = result.scalar()
return count
数据库操作 - 分页查询
分页查询:select().offset().limit()
- ffset:跳过的记录数
- limit:返回的记录数

@app.get("/book/get_books")
async def get_book_list(
page: int = 1,
page_size: int = 3,
db: AsyncSession = Depends(get_database)
):
skip = (page-1) * page_size
stmt = select(Book).offset(skip).limit(page_size)
result = await db.execute(stmt)
books = result.scalars().all()
return {"books": books}
数据库操作 - 新增
核心步骤:定义 ORM 对象 → 添加对象到事务:add(对象) → commit 提交到数据库
@app.post("/book/add_book")
async def add_book(book: Book, db: AsyncSession = Depends(get_database)):
# 获取 book 参数,创建图书对象(__dict__ 返回 book 对象的属性字典)
book_obj = Book(**book.__dict__)
db.add(book_obj)
await db.commit()
return book
数据库操作 - 更新
核心步骤:查询 get → 属性重新赋值 → commit 提交到数据库
@app.put("/book/update_book/{book_id}")
async def update_book(book_id: int, data: BookUpdate, db: AsyncSession = Depends(get_database)):
# 1. 查询
book = await db.get(Book, book_id)
if book is None:
raise HTTPException(status_code=404, detail="Book not found")
# 2. 修改属性(重新赋值)
book.bookname = data.bookname
book.author = data.author
book.price = data.price
# 3. 提交
await db.commit()
return book
数据库操作 - 删除
核心步骤:查询 get → delete 删除 → commit 提交到数据库
@app.delete("/book/delete_book/{book_id}")
async def delete_book(book_id: int, db: AsyncSession = Depends(get_database)):
db_book = await db.get(Book, book_id)
if db_book is None:
raise HTTPException(status_code=404, detail="Book not found")
await db.delete(db_book)
await db.commit()
return {"message": "Book deleted"}
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)