认证与权限:JWT+OAuth2完整实现
认证与权限:JWT + OAuth2完整实现
文章信息
- 标题:认证与权限:JWT + OAuth2完整实现
- 字数:4500字
- 预估阅读时间:20分钟
- 难度:⭐⭐⭐☆☆
一、为什么需要认证?
在AI应用开发中,API安全性至关重要。根据OWASP Top 10(2023),身份验证相关漏洞排名前三。对于FastAPI应用,你需要:
- 用户身份识别:知道请求来自哪个用户
- 会话管理:无状态的JWT比session更易扩展
- 权限控制:不同用户有不同操作权限
- API安全:防止未授权访问
本文将带你实现完整的JWT认证系统,包括登录、受保护路由、Token刷新和RBAC权限控制。
二、环境准备
2.1 安装依赖
# 安装认证相关库
uv pip install python-jose[cryptography] passlib[bcrypt] python-multipart pydantic-settings
# python-jose: JWT编码/解码
# passlib: 密码哈希
# python-multipart: 文件上传(FastAPI内置)
# 验证安装
python -c "from jose import jwt; print(jwt.__version__)"
2.2 密码哈希对比
| 算法 | 速度 | 安全性 | 内存占用 | 推荐场景 |
|---|---|---|---|---|
| bcrypt | 中等 | ★★★★★ | ~4KB | 生产环境首选 |
| argon2 | 慢 | ★★★★★ | 可配置 | 高安全场景 |
| scrypt | 慢 | ★★★★☆ | 可配置 | 兼容旧系统 |
| PBKDF2 | 中等 | ★★★☆☆ | 固定 | 兼容旧系统 |
推荐:bcrypt是当前最佳平衡,argon2是未来趋势。
三、配置与模型
3.1 配置
创建app/config.py:
"""认证配置"""
from pydantic_settings import BaseSettings
from typing import Optional
from datetime import timedelta
class AuthSettings(BaseSettings):
"""认证配置"""
# JWT配置
jwt_secret_key: str = "your-secret-key-change-in-production"
jwt_algorithm: str = "HS256"
jwt_access_token_expire_minutes: int = 30 # 访问令牌30分钟
jwt_refresh_token_expire_days: int = 7 # 刷新令牌7天
# OAuth2配置
oauth2_scheme_name: str = "bearer"
# 密码配置
password_min_length: int = 6
model_config = {
"env_file": ".env",
"extra": "ignore"
}
auth_settings = AuthSettings()
3.2 用户模型
"""认证相关模型"""
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime
from enum import Enum
class UserRole(str, Enum):
"""用户角色"""
USER = "user"
MODERATOR = "moderator"
ADMIN = "admin"
# Token模型
class Token(BaseModel):
"""Token响应"""
access_token: str
refresh_token: Optional[str] = None
token_type: str = "bearer"
expires_in: int
class TokenPayload(BaseModel):
"""Token载荷"""
sub: str # 用户ID
exp: int # 过期时间
type: str = "access" # token类型
class TokenRefreshRequest(BaseModel):
"""Token刷新请求"""
refresh_token: str
# 登录请求
class LoginRequest(BaseModel):
"""登录请求"""
username: str
password: str
# 用户创建请求(带角色)
class UserCreate(BaseModel):
"""用户创建请求"""
username: str = Field(..., min_length=3, max_length=50)
email: str
password: str = Field(..., min_length=6)
role: UserRole = UserRole.USER
3.3 密码哈希工具
使用 passlib + bcrypt 算法对密码进行哈希。bcrypt 会自动加盐,每次哈希结果都不同,但验证时能正确匹配。
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
注意:
deprecated="auto"表示如果未来 bcrypt 被标记为不安全,passlib 会自动提示迁移。不要用hashlib.md5或hashlib.sha256做密码哈希——它们太快,容易被暴力破解。
四、JWT令牌
4.1 令牌创建
JWT 令牌由三部分组成:Header(算法)、Payload(数据)、Signature(签名)。访问令牌(access token)有效期短(如 30 分钟),刷新令牌(refresh token)有效期长(如 7 天)。
from jose import jwt, JWTError
from datetime import datetime, timedelta, timezone
from typing import Optional
from app.config import auth_settings
def create_access_token(
user_id: int, username: str, role: str = "user",
expires_delta: Optional[timedelta] = None
) -> str:
expire = datetime.now(timezone.utc) + (
expires_delta or timedelta(minutes=auth_settings.jwt_access_token_expire_minutes)
)
to_encode = {
"sub": str(user_id), # 用户ID,JWT 标准字段
"username": username,
"role": role,
"type": "access", # 区分 access 和 refresh
"exp": expire, # 过期时间
"iat": datetime.now(timezone.utc), # 签发时间
}
return jwt.encode(to_encode, auth_settings.jwt_secret_key, algorithm=auth_settings.jwt_algorithm)
def create_refresh_token(user_id: int) -> str:
expire = datetime.now(timezone.utc) + timedelta(days=auth_settings.jwt_refresh_token_expire_days)
to_encode = {"sub": str(user_id), "type": "refresh", "exp": expire, "iat": datetime.now(timezone.utc)}
return jwt.encode(to_encode, auth_settings.jwt_secret_key, algorithm=auth_settings.jwt_algorithm)
解码和验证令牌:
def decode_token(token: str) -> dict:
try:
return jwt.decode(token, auth_settings.jwt_secret_key, algorithms=[auth_settings.jwt_algorithm])
except JWTError as e:
raise ValueError(f"令牌解析失败: {str(e)}")
def verify_token_type(token: str, expected_type: str) -> bool:
try:
return decode_token(token).get("type") == expected_type
except ValueError:
return False
为什么要区分
type? 刷新令牌只能用来获取新的访问令牌,不能直接访问 API。如果攻击者偷到了刷新令牌,没有type字段区分的话,就能直接用它访问受保护的接口。
4.2 令牌验证
"""令牌验证和刷新"""
from jose import jwt, JWTError, ExpiredSignatureError
from datetime import datetime, timedelta, timezone
from app.config import auth_settings
def verify_access_token(token: str) -> dict:
"""验证访问令牌"""
try:
payload = jwt.decode(
token,
auth_settings.jwt_secret_key,
algorithms=[auth_settings.jwt_algorithm],
options={"verify_exp": True}
)
# 验证类型
if payload.get("type") != "access":
raise ValueError("无效的token类型")
return payload
except ExpiredSignatureError:
raise ValueError("令牌已过期")
except JWTError as e:
raise ValueError(f"令牌验证失败: {str(e)}")
def refresh_access_token(refresh_token: str) -> dict:
"""使用刷新令牌获取新的访问令牌"""
try:
payload = jwt.decode(
refresh_token,
auth_settings.jwt_secret_key,
algorithms=[auth_settings.jwt_algorithm],
options={"verify_exp": True}
)
# 验证类型
if payload.get("type") != "refresh":
raise ValueError("无效的刷新令牌")
return {
"user_id": int(payload.get("sub")),
"type": "refresh"
}
except ExpiredSignatureError:
raise ValueError("刷新令牌已过期")
except JWTError as e:
raise ValueError(f"刷新令牌验证失败: {str(e)}")
五、OAuth2认证
5.1 FastAPI安全依赖
"""OAuth2依赖"""
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from jose import jwt, JWTError, ExpiredSignatureError
from typing import Optional
from app.database import get_db
from app.config import auth_settings
from app.models.user import User
from app.crud import user as crud_user
# OAuth2 Bearer方案
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="/api/auth/login",
scheme_name=auth_settings.oauth2_scheme_name,
auto_error=False,
)
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db)
) -> User:
"""获取当前用户(必须登录)"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的凭证",
headers={"WWW-Authenticate": "Bearer"},
)
if not token:
raise credentials_exception
try:
payload = jwt.decode(
token,
auth_settings.jwt_secret_key,
algorithms=[auth_settings.jwt_algorithm]
)
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
except (JWTError, ExpiredSignatureError):
raise credentials_exception
user = await crud_user.get_user_by_id(db, int(user_id))
if user is None:
raise credentials_exception
return user
async def get_current_user_optional(
token: Optional[str] = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db)
) -> Optional[User]:
"""获取当前用户(可选)"""
if not token:
return None
try:
payload = jwt.decode(
token,
auth_settings.jwt_secret_key,
algorithms=[auth_settings.jwt_algorithm]
)
user_id: str = payload.get("sub")
if user_id is None:
return None
except (JWTError, ExpiredSignatureError):
return None
return await crud_user.get_user_by_id(db, int(user_id))
5.2 角色验证
"""角色验证"""
from fastapi import Depends, HTTPException, status
from functools import wraps
from typing import Callable
from app.models.user import User
from app.schemas.user import UserRole
def require_role(*allowed_roles: UserRole):
"""角色验证装饰器"""
def role_checker(current_user: User = Depends(get_current_user)) -> User:
if current_user.role not in allowed_roles and not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="权限不足"
)
return current_user
return role_checker
# 使用示例
# @router.delete("/users/{user_id}")
# @require_role(UserRole.ADMIN)
# async def delete_user(user_id: int, ...):
# ...
5.3 权限控制守卫
"""权限守卫"""
from fastapi import Depends, HTTPException, status
from typing import List
from app.models.user import User
class Permission:
"""权限定义"""
READ = "read"
WRITE = "write"
DELETE = "delete"
ADMIN = "admin"
# 角色权限映射
ROLE_PERMISSIONS = {
UserRole.USER: [Permission.READ],
UserRole.MODERATOR: [Permission.READ, Permission.WRITE],
UserRole.ADMIN: [Permission.READ, Permission.WRITE, Permission.DELETE, Permission.ADMIN],
}
def check_permission(user: User, permission: str) -> bool:
"""检查用户权限"""
if user.is_admin:
return True
role_perms = ROLE_PERMISSIONS.get(user.role, [])
return permission in role_perms
def require_permission(permission: str):
"""权限验证装饰器"""
def permission_checker(
current_user: User = Depends(get_current_user)
) -> User:
if not check_permission(current_user, permission):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"需要 '{permission}' 权限"
)
return current_user
return permission_checker
# 使用示例
# @router.post("/items")
# @require_permission(Permission.WRITE)
# async def create_item(...):
# ...
六、登录接口
6.1 认证路由
创建app/routers/auth.py。认证路由包含注册、登录、刷新令牌三个核心接口。
先处理注册——注册时密码必须哈希存储,不能明文:
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.crud import user as crud_user
from app.models.user import User
from app.schemas.auth import Token, UserCreate, UserResponse, TokenRefreshRequest
from app.utils.auth import (
hash_password, create_access_token, create_refresh_token,
verify_password, refresh_access_token,
)
from app.config import auth_settings
router = APIRouter(prefix="/auth", tags=["认证"])
@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def register(user_create: UserCreate, db: AsyncSession = Depends(get_db)):
if await crud_user.get_user_by_username(db, user_create.username):
raise HTTPException(status_code=400, detail="用户名已存在")
if await crud_user.get_user_by_email(db, user_create.email):
raise HTTPException(status_code=400, detail="邮箱已存在")
user_data = user_create.model_dump()
user_data["password"] = hash_password(user_data.pop("password"))
user_data["role"] = user_create.role.value
db_user = User(**user_data)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return db_user
登录接口遵循 OAuth2 规范,使用 OAuth2PasswordRequestForm 接收 username 和 password:
@router.post("/login", response_model=Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db)
):
user = await crud_user.get_user_by_username(db, form_data.username)
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(status_code=403, detail="用户已被禁用")
return Token(
access_token=create_access_token(user.id, user.username, getattr(user, 'role', 'user')),
refresh_token=create_refresh_token(user.id),
token_type="bearer",
expires_in=auth_settings.jwt_access_token_expire_minutes * 60,
)
刷新接口用 refresh token 换取新的 access token,避免用户频繁登录:
@router.post("/refresh", response_model=Token)
async def refresh_token(request: TokenRefreshRequest, db: AsyncSession = Depends(get_db)):
result = refresh_access_token(request.refresh_token)
user = await crud_user.get_user_by_id(db, result["user_id"])
if not user or not user.is_active:
raise HTTPException(status_code=401, detail="无效的用户")
return Token(
access_token=create_access_token(user.id, user.username, getattr(user, 'role', 'user')),
refresh_token=create_refresh_token(user.id),
token_type="bearer",
expires_in=auth_settings.jwt_access_token_expire_minutes * 60,
)
@router.post("/logout")
async def logout():
return {"message": "成功登出"} # JWT 无状态,客户端删除 token 即可
为什么登出接口是空的? JWT 是无状态的,服务端不存储 token。真正的登出由客户端删除本地 token 实现。如果需要服务端强制失效,可以用 Redis 维护一个 token 黑名单。
七、受保护路由
7.1 用户路由
"""受保护的路由示例"""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.crud import user as crud_user
from app.crud import item as crud_item
from app.models.user import User
from app.schemas.user import UserResponse, ItemResponse, ItemCreate, ItemUpdate
from app.utils.auth import get_current_user
router = APIRouter(prefix="/api", tags=["API"])
@router.get("/users/me", response_model=UserResponse)
async def get_my_profile(
current_user: User = Depends(get_current_user)
):
"""获取当前用户信息"""
return current_user
@router.get("/items", response_model=list[ItemResponse])
async def list_my_items(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""获取我的商品列表"""
return await crud_item.get_items(db, owner_id=current_user.id)
@router.post("/items", response_model=ItemResponse, status_code=status.HTTP_201_CREATED)
async def create_my_item(
item: ItemCreate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""创建商品"""
return await crud_item.create_item(db, item, current_user.id)
@router.patch("/items/{item_id}", response_model=ItemResponse)
async def update_my_item(
item_id: int,
item_update: ItemUpdate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""更新商品"""
item = await crud_item.get_item_by_id(db, item_id)
if not item or item.owner_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="商品不存在"
)
return await crud_item.update_item(db, item_id, item_update)
@router.delete("/items/{item_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_my_item(
item_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""删除商品"""
item = await crud_item.get_item_by_id(db, item_id)
if not item or item.owner_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="商品不存在"
)
await crud_item.delete_item(db, item_id)
return None
7.2 管理员路由
"""管理员路由"""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.crud import user as crud_user
from app.models.user import User
from app.schemas.user import UserResponse, UserUpdate
from app.schemas.user import UserRole
from app.utils.auth import get_current_user
from app.utils.auth import require_role
router = APIRouter(prefix="/admin", tags=["管理员"])
@router.get("/users", response_model=list[UserResponse])
async def list_all_users(
skip: int = 0,
limit: int = 100,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""用户列表(管理员)"""
# 验证角色
if current_user.role != UserRole.ADMIN and not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="需要管理员权限"
)
return await crud_user.get_users(db, skip=skip, limit=limit)
@router.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
user_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""删除用户(管理员)"""
if current_user.role != UserRole.ADMIN and not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="需要管理员权限"
)
# 不能删除自己
if user_id == current_user.id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="不能删除自己"
)
success = await crud_user.delete_user(db, user_id)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="用户不存在"
)
return None
@router.patch("/users/{user_id}/role", response_model=UserResponse)
async def change_user_role(
user_id: int,
role: UserRole,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""修改用户角色(管理员)"""
if current_user.role != UserRole.ADMIN and not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="需要管理员权限"
)
user = await crud_user.get_user_by_id(db, user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="用户不存在"
)
user.role = role.value
await db.commit()
await db.refresh(user)
return user
八、Token刷新机制
8.1 自动刷新
"""自动刷新中间件"""
from fastapi import Request, HTTPException, status
from starlette.middleware.base import BaseHTTPMiddleware
from typing import Callable
import time
class TokenRefreshMiddleware(BaseHTTPMiddleware):
"""Token刷新中间件"""
async def dispatch(self, request: Request, call_next: Callable):
response = await call_next(request)
# 检查token是否即将过期(在响应头中添加)
# 客户端可以根据此信息决定是否刷新
response.headers["X-Token-Type"] = "bearer"
return response
8.2 客户端刷新
// 前端刷新技术
class AuthClient {
constructor() {
this.accessToken = null;
this.refreshToken = null;
}
async refreshToken() {
const response = await fetch('/api/auth/refresh', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
refresh_token: this.refreshToken
})
});
if (!response.ok) {
// 刷新失败,需要重新登录
this.logout();
return false;
}
const data = await response.json();
this.accessToken = data.access_token;
this.refreshToken = data.refresh_token;
return true;
}
async request(url, options = {}) {
// 自动刷新逻辑
if (this.isTokenExpiringSoon()) {
await this.refreshToken();
}
// 添加token到请求
options.headers = {
...options.headers,
'Authorization': `Bearer ${this.accessToken}`
};
return fetch(url, options);
}
isTokenExpiringSoon() {
// 检查token是否将在5分钟内过期
// 需要解码token检查exp声明
return true;
}
}
九、踩坑记录
9.1 常见错误
错误1:JWT过期时间验证
# 错误写法(过期的验证方式)
payload = jwt.decode(token, key, algorithms=["HS256"])
# 没有verify_exp选项
# 正确写法
payload = jwt.decode(
token,
key,
algorithms=["HS256"],
options={"verify_exp": True} # 显式验证过期
)
错误2:密码哈希
# 错误写法(不安全)
import hashlib
password_hash = hashlib.md5(password.encode()).hexdigest() # 不安全,可被破解
# 正确写法(使用bcrypt)
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
password_hash = pwd_context.hash(password)
错误3:Token类型混淆
# 错误写法
def create_token(user_id):
return jwt.encode({"sub": str(user_id), ...}) # 没有type
# 正确写法
def create_access_token(user_id):
return jwt.encode({"sub": str(user_id), "type": "access", ...})
def create_refresh_token(user_id):
return jwt.encode({"sub": str(user_id), "type": "refresh", ...})
9.2 安全建议
| 问题 | 解决方案 |
|---|---|
| Token泄露 | 使用HTTPS,设置短过期时间 |
| CSRF攻击 | 使用SameSite Cookie |
| XSS攻击 | 设置HttpOnly Cookie |
| 重放攻击 | 使用nonce/一次性token |
| 密码暴力破解 | 限流+验证码 |
踩坑记录6:生产环境中,JWT密钥必须使用环境变量或密钥管理系统(AWS Secrets Manager、HashiCorp Vault),不能硬编码!
十、总结
本文我们完成了:
- ✅ JWT令牌创建和解码
- ✅ OAuth2认证流程
- ✅ 密码哈希和验证
- ✅ 受保护路由实现
- ✅ Token刷新机制
- ✅ RBAC权限控制
- ✅ 角色验证装饰器
一套完整的认证系统是AI应用的基础。下一篇文章我们将学习如何调用DeepSeek API搭建智能助手。
下篇预告:《用FastAPI调用DeepSeek API搭智能助手》
- 流式输出
- 多轮对话
- 价格优化
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)