前言

经过前两篇的学习,你已经掌握了FastAPI的基础知识和核心功能。从这篇开始,我们将进入高级应用阶段。

如果说前两篇教你的是"怎么用FastAPI写接口",那么这篇教的就是"怎么写生产级别的、可维护的、高性能的API"。我们会学习三个重要的主题:依赖注入、数据库ORM、以及企业级的项目结构。

这三个主题环环相扣。依赖注入让你的代码更优雅,ORM让你能真正存储数据,而合理的项目结构则让整个应用井井有条。让我们开始吧。


一、依赖注入系统(Depends)

什么是依赖注入?

依赖注入是一个听起来很高大上,但理解后会觉得非常自然的概念。

简单来说,当一个函数需要某些"准备工作"才能运行时,我们把这些准备工作抽离出来,形成一个"依赖"。然后让FastAPI在调用函数之前,自动帮我们完成这些准备工作。

一个直观的例子

假设你有一个获取用户列表的接口,它需要分页参数(第几页、每页多少条)。传统写法是这样的:

@app.get("/users")
def get_users(page: int = 1, size: int = 10):
    # 然后你需要在每个接口里都写一遍分页逻辑
    skip = (page - 1) * size
    return {"skip": skip, "limit": size}

问题来了:如果你有10个接口都需要分页,你就要重复写10遍这个计算逻辑。如果哪天分页规则变了,你要改10个地方。

依赖注入就是用来解决这个问题的。

基础依赖的使用

我们把分页参数的处理逻辑抽离成一个独立的函数:

from fastapi import FastAPI, Depends

app = FastAPI()

# 这是一个依赖函数,它封装了公共逻辑
def pagination(skip: int = 0, limit: int = 10):
    # 这里可以做任何预处理
    return {"skip": skip, "limit": limit}

# 在接口中使用这个依赖
@app.get("/users")
def get_users(params: dict = Depends(pagination)):
    # params 就是依赖函数返回的值
    return {"data": [], "skip": params["skip"], "limit": params["limit"]}

@app.get("/products")
def get_products(params: dict = Depends(pagination)):
    # 同一个依赖,可以在多个接口中复用
    return {"data": [], "skip": params["skip"], "limit": params["limit"]}

看到区别了吗?分页逻辑只写了一次,所有接口都可以复用。如果将来要修改分页规则,只需要改pagination这一个函数就够了。

类依赖:更优雅的写法

虽然函数依赖已经很好用了,但返回字典的方式不够优雅。我们可以使用类来实现更清晰的依赖:

from fastapi import Depends

# 定义一个类来封装依赖逻辑
class Pagination:
    def __init__(self, skip: int = 0, limit: int = 10):
        self.skip = skip
        self.limit = limit
    
    def offset(self):
        # 可以添加计算方法
        return self.skip
    
    def page_info(self):
        return {"skip": self.skip, "limit": self.limit}

# 在接口中使用类依赖
@app.get("/users")
def get_users(params: Pagination = Depends()):
    # params 就是 Pagination 的实例
    return {
        "data": [],
        "skip": params.skip,
        "limit": params.limit,
        "offset": params.offset()
    }

类依赖的好处是:你可以把相关的逻辑都封装在一个类里,代码更加结构化。

依赖注入的典型应用场景

依赖注入在真实项目中非常实用,常见场景包括:

1. 权限校验

from fastapi import Depends, Header, HTTPException

def verify_token(authorization: str = Header(...)):
    # 从请求头中获取token并验证
    if not authorization.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="无效的token格式")
    token = authorization[7:]  # 去掉 "Bearer " 前缀
    # 这里应该有验证token的逻辑
    if token != "secret":
        raise HTTPException(status_code=401, detail="token无效")
    return {"user_id": 1, "username": "张三"}

@app.get("/profile")
def get_profile(user: dict = Depends(verify_token)):
    # 只有验证通过的请求才能进入这个接口
    return {"user": user}

2. 数据库会话获取

async def get_db():
    # 创建数据库连接
    db = SessionLocal()
    try:
        yield db  # 把连接交给接口使用
    finally:
        db.close()  # 接口执行完后自动关闭连接

@app.get("/users")
def get_users(db = Depends(get_db)):
    # 接口中可以直接使用db对象操作数据库
    return db.query(User).all()

3. 配置获取

from pydantic import BaseSettings

class Settings(BaseSettings):
    app_name: str = "My API"
    admin_email: str
    items_per_user: int = 50

settings = Settings()

def get_settings():
    return settings

@app.get("/info")
def get_info(settings: Settings = Depends(get_settings)):
    return {"app_name": settings.app_name, "admin_email": settings.admin_email}

依赖可以嵌套

一个依赖可以调用另一个依赖,形成依赖链:

def get_user_id(token: str = Header(...)):
    # 从token中解析出user_id
    return 1

def get_current_user(user_id: int = Depends(get_user_id)):
    # 根据user_id查询用户信息
    return {"id": user_id, "name": "张三"}

def check_permission(user: dict = Depends(get_current_user)):
    # 检查用户是否有权限
    if user["id"] != 1:
        raise HTTPException(status_code=403, detail="无权限")
    return user

@app.get("/admin")
def admin_route(user: dict = Depends(check_permission)):
    # 只有通过所有依赖检查的请求才能到达这里
    return {"message": "欢迎访问管理员页面"}

这种嵌套依赖非常强大。你可以把权限校验拆分成多个小步骤,每个步骤只做一件事,然后组合使用。


二、数据库ORM(SQLAlchemy)

到目前为止,我们的数据都是存在内存中的。服务一重启,数据就丢了。现在我们来学习如何把数据真正存储到数据库中。

什么是ORM?

ORM(Object Relational Mapping,对象关系映射)是一种技术,它让你可以用操作对象的方式来操作数据库,而不需要写SQL语句。

举个例子,没有ORM时,你要这样写:

INSERT INTO users (name, age) VALUES ('张三', 25);

有了ORM后,你可以这样写:

user = User(name="张三", age=25)
db.add(user)
db.commit()

ORM会帮你把Python代码翻译成SQL语句。这样做的好处是:

  • 不需要学习各种数据库的SQL方言

  • 利用IDE的代码补全,减少拼写错误

  • 自动处理参数转义,防止SQL注入

安装依赖

pip install sqlalchemy asyncmy
  • sqlalchemy:Python最流行的ORM框架

  • asyncmy:异步MySQL驱动,让数据库操作不阻塞

数据库配置

首先,我们需要配置数据库连接。这里以MySQL为例:

# core/database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base

# 数据库连接URL
# 格式:数据库类型+驱动://用户名:密码@主机:端口/数据库名
DATABASE_URL = "mysql+asyncmy://root:123456@localhost:3306/fastapi_db"

# 创建异步引擎
# engine 是数据库连接的核心对象,负责管理连接池
engine = create_async_engine(
    DATABASE_URL,
    echo=True,  # 打印SQL语句,方便调试,生产环境应设为False
    pool_size=10,  # 连接池大小
    max_overflow=20  # 连接池溢出后的最大连接数
)

# 创建会话工厂
# AsyncSessionLocal 可以创建数据库会话对象
AsyncSessionLocal = sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False  # 提交后不过期对象
)

# 创建模型基类
# 所有的数据模型都要继承这个类
Base = declarative_base()

# 获取数据库会话的依赖函数
async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session  # 把会话交给接口使用
            await session.commit()  # 正常结束则提交事务
        except Exception:
            await session.rollback()  # 发生异常则回滚
            raise
        finally:
            await session.close()  # 关闭会话

这段代码看起来有点复杂,但记住核心就好:

  • engine:数据库引擎,管理连接

  • AsyncSessionLocal:会话工厂,创建会话

  • get_db:依赖函数,供接口获取数据库会话

定义数据模型

数据模型对应数据库中的表。我们用类的方式来定义表结构:

# models/user.py
from sqlalchemy import Column, Integer, String, DateTime, Boolean
from datetime import datetime
from core.database import Base

# 定义一个基类,包含公共字段
class BaseModel(Base):
    __abstract__ = True  # 这个类不会创建表,只是用来继承
    
    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    created_at = Column(DateTime, default=datetime.now, nullable=False)
    updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now, nullable=False)

# 用户模型
class User(BaseModel):
    __tablename__ = "users"  # 指定数据库中的表名
    
    username = Column(String(50), unique=True, index=True, nullable=False)
    email = Column(String(100), unique=True, index=True, nullable=False)
    hashed_password = Column(String(200), nullable=False)
    is_active = Column(Boolean, default=True)
    age = Column(Integer, nullable=True)

# 商品模型
class Product(BaseModel):
    __tablename__ = "products"
    
    name = Column(String(100), nullable=False)
    price = Column(Integer, nullable=False)  # 以分为单位,避免浮点精度问题
    stock = Column(Integer, default=0)
    category = Column(String(50), nullable=True)

代码解释:

  • __tablename__:指定数据库中的表名

  • Column:定义一列,括号里是数据类型

  • nullable=False:该字段不能为空

  • unique=True:该字段的值必须唯一

  • index=True:为该字段创建索引,提高查询速度

  • default:默认值

  • onupdate:更新时的动作

Pydantic模型(Schema)

ORM模型是给数据库用的,但我们不能直接把ORM对象返回给客户端,因为里面可能有敏感字段。我们需要定义Pydantic模型来控制输入和输出:

# schemas/user.py
from pydantic import BaseModel, EmailStr, Field
from typing import Optional
from datetime import datetime

# 用户创建时的请求体
class UserCreate(BaseModel):
    username: str = Field(..., min_length=3, max_length=50)
    email: EmailStr
    password: str = Field(..., min_length=6)
    age: Optional[int] = Field(None, ge=0, le=150)

# 用户响应(返回给客户端)
class UserResponse(BaseModel):
    id: int
    username: str
    email: str
    age: Optional[int]
    is_active: bool
    created_at: datetime
    
    class Config:
        orm_mode = True  # 允许从ORM对象转换

# 用户登录请求
class UserLogin(BaseModel):
    username: str
    password: str

注意orm_mode = True这个配置,它允许我们把SQLAlchemy的ORM对象直接转换成Pydantic模型,非常方便。

CRUD操作

现在我们来写真正的接口,使用数据库存储数据:

# api/users.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from typing import List

from core.database import get_db
from models.user import User
from schemas.user import UserCreate, UserResponse

router = APIRouter(prefix="/users", tags=["用户管理"])

# 创建用户(注册)
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(user_data: UserCreate, db: AsyncSession = Depends(get_db)):
    # 检查用户名是否已存在
    result = await db.execute(select(User).where(User.username == user_data.username))
    existing_user = result.scalar_one_or_none()
    if existing_user:
        raise HTTPException(status_code=400, detail="用户名已被使用")
    
    # 创建新用户(实际项目中密码需要加密)
    new_user = User(
        username=user_data.username,
        email=user_data.email,
        hashed_password=user_data.password,  # 实际应该用hash
        age=user_data.age
    )
    
    # 添加到数据库
    db.add(new_user)
    await db.commit()  # 提交事务
    await db.refresh(new_user)  # 刷新,获取数据库生成的字段(如id)
    
    return new_user

# 获取所有用户(分页)
@router.get("/", response_model=List[UserResponse])
async def get_all_users(
    skip: int = 0, 
    limit: int = 10,
    db: AsyncSession = Depends(get_db)
):
    # 查询用户列表
    result = await db.execute(select(User).offset(skip).limit(limit))
    users = result.scalars().all()
    return users

# 获取单个用户
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(User).where(User.id == user_id))
    user = result.scalar_one_or_none()
    
    if not user:
        raise HTTPException(status_code=404, detail="用户不存在")
    
    return user

# 更新用户
@router.put("/{user_id}", response_model=UserResponse)
async def update_user(
    user_id: int, 
    user_data: UserCreate,
    db: AsyncSession = Depends(get_db)
):
    # 先查询用户是否存在
    result = await db.execute(select(User).where(User.id == user_id))
    user = result.scalar_one_or_none()
    
    if not user:
        raise HTTPException(status_code=404, detail="用户不存在")
    
    # 更新字段
    user.username = user_data.username
    user.email = user_data.email
    user.age = user_data.age
    
    await db.commit()
    await db.refresh(user)
    
    return user

# 删除用户
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(user_id: int, db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(User).where(User.id == user_id))
    user = result.scalar_one_or_none()
    
    if not user:
        raise HTTPException(status_code=404, detail="用户不存在")
    
    await db.delete(user)
    await db.commit()
    
    return None  # 204响应不需要返回内容

创建数据库表

在启动应用之前,我们需要先创建数据库表。可以写一个初始化脚本:

# init_db.py
import asyncio
from core.database import engine, Base
from models.user import User  # 导入模型,确保Base知道要创建哪些表

async def init():
    async with engine.begin() as conn:
        # 删除所有表(谨慎使用,会丢失数据)
        # await conn.run_sync(Base.metadata.drop_all)
        
        # 创建所有表
        await conn.run_sync(Base.metadata.create_all)
        print("数据库表创建成功!")

if __name__ == "__main__":
    asyncio.run(init())

关于异步操作的说明

你可能注意到了,我们所有的数据库操作都用了await。这是因为我们使用的是异步数据库驱动。

异步的好处是:当数据库在处理查询时,FastAPI可以去处理其他请求,不会阻塞。这在处理大量并发请求时非常重要。

简单的理解:

  • 同步:等数据库查完,才能做下一件事

  • 异步:告诉数据库"你查着,我先去处理别的请求",等数据库查完了再回来拿结果


三、FastAPI标准项目结构(企业级)

当你开始做一个真实项目时,代码组织非常重要。下面是FastAPI社区推荐的企业级项目结构:

fastapi-project/
│
├── main.py                 # 应用入口,创建FastAPI实例
├── .env                    # 环境变量(不提交到git)
├── .env.example            # 环境变量示例
├── requirements.txt        # 依赖列表
│
├── api/                    # 路由层
│   ├── __init__.py
│   ├── deps.py            # 公共依赖(如get_current_user)
│   └── v1/                # API版本1
│       ├── __init__.py
│       ├── users.py       # 用户相关路由
│       ├── products.py    # 商品相关路由
│       └── orders.py      # 订单相关路由
│
├── core/                   # 核心配置层
│   ├── __init__.py
│   ├── config.py          # 配置管理(读取.env)
│   ├── database.py        # 数据库连接配置
│   ├── security.py        # 安全相关(加密、JWT)
│   └── middleware.py      # 自定义中间件
│
├── models/                 # ORM模型层
│   ├── __init__.py
│   ├── user.py            # 用户模型
│   ├── product.py         # 商品模型
│   └── order.py           # 订单模型
│
├── schemas/                # Pydantic模型层
│   ├── __init__.py
│   ├── user.py            # 用户相关的请求/响应模型
│   ├── product.py
│   └── common.py          # 公共模型(如分页)
│
├── services/               # 业务逻辑层
│   ├── __init__.py
│   ├── user_service.py    # 用户业务逻辑
│   └── auth_service.py    # 认证业务逻辑
│
├── utils/                  # 工具函数
│   ├── __init__.py
│   ├── pagination.py      # 分页工具
│   └── validators.py      # 自定义验证器
│
└── tests/                  # 测试
    ├── __init__.py
    ├── test_users.py
    └── test_products.py

各层职责说明

api层(路由层)

  • 只负责接收请求和返回响应

  • 参数校验由FastAPI自动完成

  • 调用service层处理业务逻辑

  • 不直接操作数据库

services层(业务逻辑层)

  • 包含核心业务逻辑

  • 调用models层操作数据库

  • 可以调用其他service

  • 不直接处理HTTP请求

models层(ORM层)

  • 定义数据库表结构

  • 只负责与数据库交互

  • 不包含业务逻辑

schemas层(Pydantic层)

  • 定义请求体和响应体的结构

  • 数据验证规则

  • 序列化和反序列化

core层(核心配置层)

  • 配置管理(从环境变量读取)

  • 数据库连接

  • 安全相关(JWT、密码加密)

  • 中间件

utils层(工具层)

  • 通用工具函数

  • 与业务无关的辅助功能

核心配置文件示例

# core/config.py
from pydantic import BaseSettings
from typing import Optional

class Settings(BaseSettings):
    # 应用配置
    APP_NAME: str = "FastAPI Project"
    APP_VERSION: str = "1.0.0"
    DEBUG: bool = True
    
    # 数据库配置
    DATABASE_URL: str = "mysql+asyncmy://root:123456@localhost:3306/fastapi_db"
    
    # JWT配置
    SECRET_KEY: str = "your-secret-key-change-this"
    ALGORITHM: str = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
    
    # 管理员配置
    FIRST_SUPERUSER: str = "admin@example.com"
    FIRST_SUPERUSER_PASSWORD: str = "admin123"
    
    class Config:
        env_file = ".env"  # 从.env文件读取环境变量

settings = Settings()

主文件示例

# main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from core.config import settings
from core.database import engine, Base
from api.v1 import users, products

# 创建应用
app = FastAPI(
    title=settings.APP_NAME,
    version=settings.APP_VERSION,
    docs_url="/docs" if settings.DEBUG else None,  # 生产环境可关闭文档
    redoc_url="/redoc" if settings.DEBUG else None
)

# 配置CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"] if settings.DEBUG else ["https://yourdomain.com"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 注册路由
app.include_router(users.router)
app.include_router(products.router)

# 根路径
@app.get("/")
async def root():
    return {
        "message": f"Welcome to {settings.APP_NAME}",
        "version": settings.APP_VERSION
    }

# 启动事件(创建数据库表)
@app.on_event("startup")
async def startup():
    async with engine.begin() as conn:
        # 生产环境应该使用迁移工具(如Alembic)
        await conn.run_sync(Base.metadata.create_all)
    print("数据库初始化完成")

# 关闭事件
@app.on_event("shutdown")
async def shutdown():
    await engine.dispose()
    print("数据库连接已关闭")

四、实战:用户认证系统(JWT)

现在我们来整合所学知识,实现一个完整的用户认证系统。

安装JWT依赖

pip install python-jose[cryptography] passlib[bcrypt]
  • python-jose:处理JWT令牌

  • passlib:密码加密

安全工具函数

# core/security.py
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from core.config import settings

# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_password: str, hashed_password: str) -> bool:
    """验证密码"""
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
    """对密码进行哈希加密"""
    return pwd_context.hash(password)

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
    """创建JWT访问令牌"""
    to_encode = data.copy()
    
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
    
    return encoded_jwt

def decode_access_token(token: str) -> Optional[dict]:
    """解码JWT令牌"""
    try:
        payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
        return payload
    except JWTError:
        return None

用户服务

# services/user_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from models.user import User
from schemas.user import UserCreate
from core.security import get_password_hash, verify_password

class UserService:
    """用户相关的业务逻辑"""
    
    @staticmethod
    async def get_user_by_username(db: AsyncSession, username: str):
        result = await db.execute(select(User).where(User.username == username))
        return result.scalar_one_or_none()
    
    @staticmethod
    async def get_user_by_email(db: AsyncSession, email: str):
        result = await db.execute(select(User).where(User.email == email))
        return result.scalar_one_or_none()
    
    @staticmethod
    async def get_user_by_id(db: AsyncSession, user_id: int):
        result = await db.execute(select(User).where(User.id == user_id))
        return result.scalar_one_or_none()
    
    @staticmethod
    async def create_user(db: AsyncSession, user_data: UserCreate):
        # 检查用户是否已存在
        existing = await UserService.get_user_by_username(db, user_data.username)
        if existing:
            return None, "用户名已被使用"
        
        existing_email = await UserService.get_user_by_email(db, user_data.email)
        if existing_email:
            return None, "邮箱已被注册"
        
        # 创建新用户(密码加密)
        new_user = User(
            username=user_data.username,
            email=user_data.email,
            hashed_password=get_password_hash(user_data.password),
            age=user_data.age
        )
        
        db.add(new_user)
        await db.commit()
        await db.refresh(new_user)
        
        return new_user, None
    
    @staticmethod
    async def authenticate_user(db: AsyncSession, username: str, password: str):
        user = await UserService.get_user_by_username(db, username)
        if not user:
            return None
        if not verify_password(password, user.hashed_password):
            return None
        return user

认证路由

# api/v1/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import timedelta

from core.database import get_db
from core.config import settings
from core.security import create_access_token
from services.user_service import UserService
from schemas.user import UserCreate, UserResponse

router = APIRouter(prefix="/auth", tags=["认证"])

# OAuth2密码流
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")

# 注册
@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def register(user_data: UserCreate, db: AsyncSession = Depends(get_db)):
    user, error = await UserService.create_user(db, user_data)
    
    if error:
        raise HTTPException(status_code=400, detail=error)
    
    return user

# 登录
@router.post("/login")
async def login(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: AsyncSession = Depends(get_db)
):
    user = await UserService.authenticate_user(db, form_data.username, form_data.password)
    
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户名或密码错误",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    # 创建访问令牌
    access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username, "user_id": user.id},
        expires_delta=access_token_expires
    )
    
    return {
        "access_token": access_token,
        "token_type": "bearer",
        "user_id": user.id,
        "username": user.username
    }

# 获取当前用户(依赖函数)
async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db)
):
    from core.security import decode_access_token
    
    payload = decode_access_token(token)
    if not payload:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="无效的token",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    username = payload.get("sub")
    if not username:
        raise HTTPException(status_code=401, detail="无效的token")
    
    user = await UserService.get_user_by_username(db, username)
    if not user:
        raise HTTPException(status_code=401, detail="用户不存在")
    
    return user

# 获取当前用户信息(需要认证)
@router.get("/me", response_model=UserResponse)
async def get_me(current_user: User = Depends(get_current_user)):
    return current_user

五、总结

这篇博客的内容比较多,我们回顾一下核心要点:

依赖注入(Depends)

  • 把公共逻辑抽离成依赖函数或依赖类

  • 多个接口可以复用同一个依赖

  • 依赖可以嵌套,形成依赖链

  • 典型应用:分页参数、权限校验、数据库会话

数据库ORM(SQLAlchemy)

  • ORM让你用对象的方式操作数据库

  • 异步驱动让数据库操作不阻塞事件循环

  • 需要定义:数据库配置、ORM模型、Pydantic模型

  • CRUD操作使用selectadddelete等方法

项目结构

  • 分层架构:api(路由)、services(业务)、models(ORM)、schemas(Pydantic)

  • 核心配置放在core目录

  • 工具函数放在utils目录

  • 测试放在tests目录

认证系统

  • 密码不能明文存储,需要用bcrypt加密

  • JWT用于无状态的用户认证

  • OAuth2PasswordBearer提供标准登录流程

  • 依赖注入实现get_current_user,方便在接口中使用

下一步学什么?

  • Alembic:数据库迁移管理,优雅地修改表结构

  • Redis:缓存和会话存储

  • Celery:异步任务队列(发送邮件、处理图片等)

  • Docker:容器化部署

  • Pytest:单元测试和集成测试

练习题

  1. 实现一个商品管理的完整API(增删改查),要求使用异步SQLAlchemy

  2. 为商品接口添加权限控制,只有登录用户才能创建商品

  3. 实现分页功能的通用依赖,可以在任何列表接口复用

恭喜你完成了FastAPI三篇入门教程!现在你已经具备了开发生产级API的能力。接下来的路,就靠实际项目来磨练了。加油!

 

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐