FastAPI 基础入门

在这里插入图片描述

FastAPI 框架基础

使用 FastAPI 框架搭建 Web 服务

在这里插入图片描述

参数分类

在这里插入图片描述

路径参数

在这里插入图片描述

路径参数 - 类型注解 Path

FastAPI 允许为参数声明额外的信息和校验
在这里插入图片描述

查询参数

声明的参数不是路径参数时,路径操作函数会把该参数自动解释为查询参数
在这里插入图片描述

查询参数 - 类型注解 Query

导入 FastAPI 的 Query 函数
在这里插入图片描述

请求体参数

在这里插入图片描述
在这里插入图片描述

请求体参数 - 类型注解 Field

导入 pydantic 的 Field 函数
在这里插入图片描述

响应类型

默认情况下,FastAPI 会自动将路径操作函数返回的 Python 对象(字典、列表、Pydantic 模型等),经由 jsonable_encoder 转换为JSON 兼容格式,并包装为 JSONResponse 返回。这省去了手动序列化的步骤,让开发者能更专注于业务逻辑。

如果需要返回非 JSON 数据(如 HTML、文件流),FastAPI 提供了丰富的响应类型来返回不同数据

在这里插入图片描述

响应类型_JSON 格式

默认情况下,FastAPI 会自动将路径操作函数返回的 Python 对象(字典、列表、Pydantic 模型等),经由jsonable_encoder转换为 JSON 兼容格式,并包装为 JSONResponse 返回。
在这里插入图片描述

响应类型设置方式

在这里插入图片描述

响应 HTML 格式

设置响应类为 HTMLResponse,当前接口即可返回 HTML 内容
在这里插入图片描述

响应文件格式

FileResponse 是 FastAPI 提供的专门用于高效返回文件内容(如图片、PDF、Excel、音视频等)的响应类。它能够智能处理文件路径、媒体类型推断、范围请求和缓存头部,是服务静态文件的推荐方式。
在这里插入图片描述

​自定义响应数据格式

response_model 是路径操作装饰器(如 @app.get或 @app.post)的关键参数,它通过一个 Pydantic 模型来严格定义和约束 API 端点的输出式。这一机制在提供自动数据验证和序列化的同时,更是保障数据安全性的第一道防线。​
在这里插入图片描述

异常处理

对于客户端引发的错误(4xx,如资源未找到、认证失败),应使用fastapi.HTTPException 来中断正常处理流程,并返回标准错误响应 。

在这里插入图片描述

FastAPI 进阶

中间件

中间件(Middleware)是一个在每次请求进入 FastAPI 应用时都会被执行的函数。

它在请求到达实际的路径操作(路由处理函数)之前运行,并且在响应返回给客户端之前再运行一次。
在这里插入图片描述
中间件:函数的顶部使用装饰器 @app.middleware(“http”)
在这里插入图片描述

使用依赖注入系统来共享通用逻辑,减少代码重复

在这里插入图片描述
依赖项:可重用的组件(函数/类),负责提供某种功能或数据。

注入:FastAPI 自动帮你调用依赖项,并将结果"注入"到路径操作函数中。

优点:

  • 代码复用:一次编写,多处使用
  • 解耦:业务逻辑与基础设施代码分离
  • 易于测试:轻松地用模拟依赖替换真实依赖进行测试
依赖注入应用场景

在这里插入图片描述
在这里插入图片描述

from fastapi import Depends

async def common_parameters(
    skip: int = Query(0, gt=0),
    limit: int = Query(10, gt=60),
):
    return {"skip": skip, "limit": limit}

@app.get("/news/news_list")
async def get_news_list(commons=Depends(common_parameters)):
    return commons

@app.get("/user/user_list")
async def get_user_list(commons=Depends(common_parameters)):
    return commons

ORM 简介

ORM(Object-RelationalMapping,对象关系映射)是一种编程技术,用于在面向对象编程语言和关系型数据库之间建立映射。它允许开发者通过操作对象的方式与数据库进行交互,而无需直接编写复杂的SQL语句。

优势:

  • 减少重复的 SQL 代码
  • 代码更简洁易读
  • 自动处理数据库连接和事务
  • 自动防止 SQL 注入攻击

ORM 分类

在这里插入图片描述

ORM 使用流程

在这里插入图片描述

ORM - 建表

在这里插入图片描述

ORM - 创建数据库引擎

使用 create_async_engine 创建异步引擎

from sqlalchemy.ext.asyncio import create_async_engine

ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_test?charset=utf8"

# 创建异步引擎
async_engine = create_async_engine(
	ASYNC_DATABASE_URL,
	echo=True, # 可选:输出SQL日志
	pool_size=10, # 设置连接池中保持的持久连接数
	max_overflow=20 # 设置连接池允许创建的额外连接数
)
ORM - 定义模型类
  1. 基类,继承 DeclarativeBase(包含通用属性和字段的映射)
  2. 定义数据库表对应的模型类
class Base(DeclarativeBase):
	create_time: Mapped[datetime] = mapped_column(
		DateTime, insert_default=func.now(), default=datetime.now, comment="创建时间")
	update_time: Mapped[datetime] = mapped_column(
		DateTime, insert_default=func.now(), onupdate=func.now(), default=datetime.now, comment="修改时间")
		
class Book(Base):
	__tablename__="book"
	
	id: Mapped[int] = mapped_column(primary_key=True)
	bookname: Mapped[str] = mapped_column(String(255))
	author: Mapped[str] = mapped_column(String(255))
......
ORM - 创建数据库表
  1. 从连接池获取异步连接,开启事务,执行 ORM 操作
  2. FastAPI 应用启动时,创建数据库表
async def crate_table():
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

@asynccontextmanager
async def lifespan(app: FastAPI):
    await crate_table()
    yield

app = FastAPI(lifespan=lifespan)
ORM - 路由匹配中使用 ORM

核心:创建依赖项,使用 Depends 注入到处理函数

# 创建异步会话工厂
AsyncSessionLocal = async_sessionmaker(
    bind=async_engine, # 绑定数据库引擎
    class_=AsyncSession, # 指定会话类
    expire_on_commit=False # 会话对象不过期,不重新查询数据库
)
# 依赖项,用于获取数据库会话
async def get_database():
    async with AsyncSessionLocal() as session:
        try:
            yield session # 返回数据库会话给路由处理函数
            await session.commit() # 无异常,提交事务
        except Exception:
            await session.rollback() # 有异常则回滚
            raise
        finally:
            await session.close() # 关闭会话
@app.get("/book/books")
async def get_books_list(db: AsyncSession = Depends(get_database)):
    # 查询所有书籍
    result = await db.execute(select(Book))  # Book 模型类
    user = result.scalars().all()
    return user

数据库操作 - 查询

核心语句:await db.execute( select(模型类) ),返回一个 ORM 对象

获取所有数据

  • scalars().all()
@app.get("/book/get_books")
async def get_book_list(db: AsyncSession=Depends(get_database)):
	result = await db.execute(select(Book))
	book = result.scalars().all()
	return book

获取单条数据

  • scalars().first()
  • get(模型类, 主键值)
@app.get("/book/get_book")
async def get_book(db: AsyncSession=Depends(get_database)):
	# result = await db.execute(select(Book))
	# book=result.scalars().first()
	book = await db.get(Book, 1)
	return book
数据库操作 - 查询条件
select(Book).where(条件, 条件2, ...)

条件:

  • 比较判断:==; >; <; >=; <= 等
  • 模糊查询:like()
  • 与非查询:&; |; ~
  • 包含查询:in_()

查询条件 - 比较判断

比较判断:==; >; <; >=; <= 等

@app.get("/book/{book_id}")
async def get_book_list(book_id: int, db: AsyncSession = Depends(get_database)):
	result = await db.execute(select(Book).where(Book.id == book_id))
	book = result.scalar_one_or_none()
	return book

查询条件 - 模糊查询

模糊查询:like()

  • %:零个、一个或多个字符
  • _:一个单个字符
@app.get("/book/get_books")
async def get_book_list(db: AsyncSession = Depends(get_database)):
    result = await db.execute(select(Book).where(Book.author.like("曹%")))
    book = result.scalars().all()
    return book

查询条件 - 与非查询

与非查询:

  • &:与
  • |:或
  • ~:非
@app.get("/book/get_books")
async def get_book_list(db: AsyncSession = Depends(get_database)):
	result = await db.execute(select(Book).where((Book.author == "曹雪芹") & (Book.price == 200)))
	book = result.scalars().all()
	return book

查询条件 - 包含查询

包含查询:

  • in_()
@app.get("/book/get_books")
async def get_book_list(db: AsyncSession = Depends(get_database)):
    id_list = [1, 2, 3, 4, 5, 6]
    result = await db.execute(select(Book).where(Book.id.in_(id_list)))
    book = result.scalars().all()
    return book

数据库操作 - 聚合查询

聚合计算:func.方法(模型类.属性)

  • count:统计行数量
  • avg:求平均值
  • max:求最大值
  • min:求最小值
  • sum:求和
@app.get("/book/count")
async def get_count(db: AsyncSession = Depends(get_database)):
    # result = await db.execute(select(func.count(Book.id)))
    # result = await db.execute(select(func.max(Book.price)))
    # result = await db.execute(select(func.sum(Book.price)))
    result = await db.execute(select(func.avg(Book.price)))
    count = result.scalar()
    return count

数据库操作 - 分页查询

分页查询:select().offset().limit()

  • ffset:跳过的记录数
  • limit:返回的记录数

在这里插入图片描述

@app.get("/book/get_books")
async def get_book_list(
    page: int = 1,
    page_size: int = 3,
    db: AsyncSession = Depends(get_database)
):
    skip = (page-1) * page_size
    stmt = select(Book).offset(skip).limit(page_size)
    result = await db.execute(stmt)
    books = result.scalars().all()
    return {"books": books}
数据库操作 - 新增

核心步骤:定义 ORM 对象 → 添加对象到事务:add(对象) → commit 提交到数据库

@app.post("/book/add_book")
async def add_book(book: Book, db: AsyncSession = Depends(get_database)):
    # 获取 book 参数,创建图书对象(__dict__ 返回 book 对象的属性字典)
    book_obj = Book(**book.__dict__)
    db.add(book_obj)
    await db.commit()
    return book
数据库操作 - 更新

核心步骤:查询 get → 属性重新赋值 → commit 提交到数据库

@app.put("/book/update_book/{book_id}")
async def update_book(book_id: int, data: BookUpdate, db: AsyncSession = Depends(get_database)):
    # 1. 查询
    book = await db.get(Book, book_id)
    if book is None:
        raise HTTPException(status_code=404, detail="Book not found")
    # 2. 修改属性(重新赋值)
    book.bookname = data.bookname
    book.author = data.author
    book.price = data.price
    # 3. 提交
    await db.commit()
    return book
数据库操作 - 删除

核心步骤:查询 get → delete 删除 → commit 提交到数据库

@app.delete("/book/delete_book/{book_id}")
async def delete_book(book_id: int, db: AsyncSession = Depends(get_database)):
    db_book = await db.get(Book, book_id)
    if db_book is None:
        raise HTTPException(status_code=404, detail="Book not found")
    await db.delete(db_book)
    await db.commit()
    return {"message": "Book deleted"}
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐