AI 全栈开发实战(3):用户系统 —— 注册、登录、JWT 认证与前端集成
·
前言
前两篇我们搭建了项目骨架,所有服务都能跑起来了。今天实现第一个完整的功能模块——用户系统。
用户系统是几乎所有 Web 应用的基础。我们将实现:
- 用户注册(邮箱 + 密码)
- 用户登录(JWT Token)
- 密码加密与验证
- 登录状态检查与 Token 刷新
- 完整的 API + 前端页面
1. 密码加密
密码绝对不能明文存储。我们使用 bcrypt 进行哈希加密。
# backend/app/services/auth.py
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
"""对密码进行 bcrypt 哈希。"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""验证密码与哈希是否匹配。"""
return pwd_context.verify(plain_password, hashed_password)
为什么用 bcrypt?
- 自动加盐(每个密码的 salt 不同)
- 可调节计算成本(随着硬件升级可以增加迭代次数)
- 抗 GPU/ASIC 加速(内存密集型,专用硬件加速效果有限)
绝对不要用:MD5、SHA-1、SHA-256(这些是哈希函数,不是密码哈希函数,没有抗暴力破解设计)。
2. JWT Token
使用 JWT(JSON Web Token)做身份认证。
# backend/app/services/auth.py(续)
from datetime import datetime, timedelta, timezone
from jose import jwt, JWTError
from typing import Optional
from app.config import settings
def create_access_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str:
"""创建 JWT Token。"""
payload = {
"sub": user_id, # subject——用户 ID
"iat": datetime.now(timezone.utc), # 签发时间
"exp": datetime.now(timezone.utc) + (expires_delta or timedelta(hours=settings.JWT_EXPIRATION_HOURS)), # 过期时间
"type": "access",
}
return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
def decode_token(token: str) -> Optional[dict]:
"""解码并验证 JWT Token。"""
try:
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])
return payload
except JWTError:
return None
def get_current_user_id(token: str) -> Optional[str]:
"""从 Token 中提取用户 ID。"""
payload = decode_token(token)
if payload is None:
return None
return payload.get("sub")
3. 注册接口
3.1 Pydantic Schema
# backend/app/schemas/auth.py
from pydantic import BaseModel, EmailStr, Field
class RegisterRequest(BaseModel):
email: str = Field(..., description="邮箱地址")
password: str = Field(..., min_length=6, max_length=100, description="密码")
nickname: str = Field(..., min_length=1, max_length=50, description="昵称")
class LoginRequest(BaseModel):
email: str = Field(..., description="邮箱地址")
password: str = Field(..., description="密码")
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
user: "UserInfo"
class UserInfo(BaseModel):
id: str
email: str
nickname: str
avatar_url: str
class Config:
from_attributes = True
class ErrorResponse(BaseModel):
detail: str
3.2 注册逻辑
# backend/app/services/user_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.user import User
from app.services.auth import hash_password
class UserService:
@staticmethod
async def register(db: AsyncSession, email: str, password: str, nickname: str) -> User:
"""注册新用户。"""
# 检查邮箱是否已存在
existing = await db.execute(select(User).where(User.email == email))
if existing.scalar_one_or_none():
raise ValueError("该邮箱已被注册")
# 创建用户
user = User(
email=email,
password_hash=hash_password(password),
nickname=nickname,
)
db.add(user)
await db.commit()
await db.refresh(user)
return user
@staticmethod
async def authenticate(db: AsyncSession, email: str, password: str) -> User:
"""验证用户登录。"""
from app.services.auth import verify_password
result = await db.execute(select(User).where(User.email == email))
user = result.scalar_one_or_none()
if not user:
raise ValueError("邮箱或密码错误")
if not verify_password(password, user.password_hash):
raise ValueError("邮箱或密码错误")
if not user.is_active:
raise ValueError("账户已被禁用")
return user
@staticmethod
async def get_by_id(db: AsyncSession, user_id: str) -> User:
"""通过 ID 获取用户。"""
from uuid import UUID
result = await db.execute(select(User).where(User.id == UUID(user_id)))
user = result.scalar_one_or_none()
if not user:
raise ValueError("用户不存在")
return user
3.3 注册路由
# backend/app/routers/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.schemas.auth import (
RegisterRequest, LoginRequest,
TokenResponse, UserInfo, ErrorResponse,
)
from app.services.user_service import UserService
from app.services.auth import create_access_token
router = APIRouter()
@router.post("/register", response_model=TokenResponse, status_code=status.HTTP_201_CREATED)
async def register(body: RegisterRequest, db: AsyncSession = Depends(get_db)):
"""用户注册。"""
try:
user = await UserService.register(
db, email=body.email, password=body.password, nickname=body.nickname
)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
token = create_access_token(str(user.id))
return TokenResponse(
access_token=token,
user=UserInfo(
id=str(user.id),
email=user.email,
nickname=user.nickname,
avatar_url=user.avatar_url or "",
),
)
@router.post("/login", response_model=TokenResponse)
async def login(body: LoginRequest, db: AsyncSession = Depends(get_db)):
"""用户登录。"""
try:
user = await UserService.authenticate(db, email=body.email, password=body.password)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(e))
token = create_access_token(str(user.id))
return TokenResponse(
access_token=token,
user=UserInfo(
id=str(user.id),
email=user.email,
nickname=user.nickname,
avatar_url=user.avatar_url or "",
),
)
⚠️ 安全细节:
- 登录失败时返回
"邮箱或密码错误",不告诉用户到底是邮箱不存在还是密码错误——防止攻击者枚举邮箱 - 密码传输通过 HTTPS 加密,不在前端做任何哈希(前端哈希等于明文,因为攻击者可以直接调用 API)
4. 认证中间件
需要登录的接口通过依赖注入验证 Token:
# backend/app/services/auth.py(续)
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def require_auth(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncSession = Depends(get_db),
) -> User:
"""需要登录的接口依赖。"""
token = credentials.credentials
user_id = get_current_user_id(token)
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token 无效或已过期",
)
try:
user = await UserService.get_by_id(db, user_id)
except ValueError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户不存在",
)
return user
@router.get("/me", response_model=UserInfo)
async def get_me(user: User = Depends(require_auth)):
"""获取当前用户信息。"""
return UserInfo(
id=str(user.id),
email=user.email,
nickname=user.nickname,
avatar_url=user.avatar_url or "",
)
5. 注册路由到应用
# backend/app/main.py(更新)
from app.routers import auth
app.include_router(auth.router, prefix="/api/auth", tags=["Auth"])
# 现在 protected 路由也可以使用了
from app.services.auth import require_auth
6. 前端登录/注册页面
6.1 API 层
// frontend/src/api/auth.ts
import api from "@/lib/api";
export interface UserInfo {
id: string;
email: string;
nickname: string;
avatar_url: string;
}
export interface TokenResponse {
access_token: string;
token_type: string;
user: UserInfo;
}
export async function register(
email: string,
password: string,
nickname: string
): Promise<TokenResponse> {
const { data } = await api.post("/auth/register", {
email,
password,
nickname,
});
return data;
}
export async function login(
email: string,
password: string
): Promise<TokenResponse> {
const { data } = await api.post("/auth/login", { email, password });
return data;
}
export async function getMe(): Promise<UserInfo> {
const { data } = await api.get("/auth/me");
return data;
}
6.2 Auth Context
// frontend/src/hooks/useAuth.tsx
import { createContext, useContext, useState, useEffect, ReactNode } from "react";
import { UserInfo, login as apiLogin, register as apiRegister, getMe } from "@/api/auth";
interface AuthContextType {
user: UserInfo | null;
token: string | null;
isLoading: boolean;
login: (email: string, password: string) => Promise<void>;
register: (email: string, password: string, nickname: string) => Promise<void>;
logout: () => void;
}
const AuthContext = createContext<AuthContextType | null>(null);
export function AuthProvider({ children }: { children: ReactNode }) {
const [user, setUser] = useState<UserInfo | null>(null);
const [token, setToken] = useState<string | null>(() =>
localStorage.getItem("token")
);
const [isLoading, setIsLoading] = useState(true);
useEffect(() => {
if (token) {
getMe()
.then(setUser)
.catch(() => {
localStorage.removeItem("token");
setToken(null);
})
.finally(() => setIsLoading(false));
} else {
setIsLoading(false);
}
}, [token]);
const login = async (email: string, password: string) => {
const res = await apiLogin(email, password);
localStorage.setItem("token", res.access_token);
setToken(res.access_token);
setUser(res.user);
};
const register = async (email: string, password: string, nickname: string) => {
const res = await apiRegister(email, password, nickname);
localStorage.setItem("token", res.access_token);
setToken(res.access_token);
setUser(res.user);
};
const logout = () => {
localStorage.removeItem("token");
setToken(null);
setUser(null);
};
return (
<AuthContext.Provider value={{ user, token, isLoading, login, register, logout }}>
{children}
</AuthContext.Provider>
);
}
export function useAuth() {
const ctx = useContext(AuthContext);
if (!ctx) throw new Error("useAuth must be used within AuthProvider");
return ctx;
}
6.3 登录页面
// frontend/src/pages/Login.tsx
import { useState } from "react";
import { useNavigate, Link } from "react-router-dom";
import { useAuth } from "@/hooks/useAuth";
import { Button } from "@/components/ui/button";
import { Input } from "@/components/ui/input";
import { Card, CardContent, CardDescription, CardFooter, CardHeader, CardTitle } from "@/components/ui/card";
export default function Login() {
const [email, setEmail] = useState("");
const [password, setPassword] = useState("");
const [error, setError] = useState("");
const [loading, setLoading] = useState(false);
const { login } = useAuth();
const navigate = useNavigate();
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
setError("");
setLoading(true);
try {
await login(email, password);
navigate("/dashboard");
} catch (err: any) {
setError(err.response?.data?.detail || "登录失败");
} finally {
setLoading(false);
}
};
return (
<div className="min-h-screen flex items-center justify-center bg-gray-50 px-4">
<Card className="w-full max-w-md">
<CardHeader className="text-center">
<CardTitle className="text-2xl">登录 KNow</CardTitle>
<CardDescription>登录你的知识库账户</CardDescription>
</CardHeader>
<form onSubmit={handleSubmit}>
<CardContent className="space-y-4">
{error && (
<div className="bg-red-50 text-red-600 text-sm px-3 py-2 rounded-md">
{error}
</div>
)}
<div>
<label className="text-sm font-medium mb-1 block">邮箱</label>
<Input
type="email"
placeholder="your@email.com"
value={email}
onChange={(e) => setEmail(e.target.value)}
required
/>
</div>
<div>
<label className="text-sm font-medium mb-1 block">密码</label>
<Input
type="password"
placeholder="••••••••"
value={password}
onChange={(e) => setPassword(e.target.value)}
required
/>
</div>
</CardContent>
<CardFooter className="flex flex-col space-y-2">
<Button type="submit" className="w-full" disabled={loading}>
{loading ? "登录中..." : "登录"}
</Button>
<p className="text-sm text-gray-500">
还没有账户?{" "}
<Link to="/register" className="text-blue-600 hover:underline">
注册
</Link>
</p>
</CardFooter>
</form>
</Card>
</div>
);
}
6.4 注册页面
// frontend/src/pages/Register.tsx
import { useState } from "react";
import { useNavigate, Link } from "react-router-dom";
import { useAuth } from "@/hooks/useAuth";
import { Button } from "@/components/ui/button";
import { Input } from "@/components/ui/input";
import { Card, CardContent, CardDescription, CardFooter, CardHeader, CardTitle } from "@/components/ui/card";
export default function Register() {
const [nickname, setNickname] = useState("");
const [email, setEmail] = useState("");
const [password, setPassword] = useState("");
const [confirmPassword, setConfirmPassword] = useState("");
const [error, setError] = useState("");
const [loading, setLoading] = useState(false);
const { register } = useAuth();
const navigate = useNavigate();
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
setError("");
if (password !== confirmPassword) {
setError("两次密码不一致");
return;
}
if (password.length < 6) {
setError("密码至少 6 位");
return;
}
setLoading(true);
try {
await register(email, password, nickname);
navigate("/dashboard");
} catch (err: any) {
setError(err.response?.data?.detail || "注册失败");
} finally {
setLoading(false);
}
};
return (
<div className="min-h-screen flex items-center justify-center bg-gray-50 px-4">
<Card className="w-full max-w-md">
<CardHeader className="text-center">
<CardTitle className="text-2xl">注册 KNow</CardTitle>
<CardDescription>创建你的知识库账户</CardDescription>
</CardHeader>
<form onSubmit={handleSubmit}>
<CardContent className="space-y-4">
{error && (
<div className="bg-red-50 text-red-600 text-sm px-3 py-2 rounded-md">
{error}
</div>
)}
<div>
<label className="text-sm font-medium mb-1 block">昵称</label>
<Input
placeholder="你的名字"
value={nickname}
onChange={(e) => setNickname(e.target.value)}
required
/>
</div>
<div>
<label className="text-sm font-medium mb-1 block">邮箱</label>
<Input
type="email"
placeholder="your@email.com"
value={email}
onChange={(e) => setEmail(e.target.value)}
required
/>
</div>
<div>
<label className="text-sm font-medium mb-1 block">密码</label>
<Input
type="password"
placeholder="至少 6 位"
value={password}
onChange={(e) => setPassword(e.target.value)}
required
/>
</div>
<div>
<label className="text-sm font-medium mb-1 block">确认密码</label>
<Input
type="password"
placeholder="再次输入密码"
value={confirmPassword}
onChange={(e) => setConfirmPassword(e.target.value)}
required
/>
</div>
</CardContent>
<CardFooter className="flex flex-col space-y-2">
<Button type="submit" className="w-full" disabled={loading}>
{loading ? "注册中..." : "注册"}
</Button>
<p className="text-sm text-gray-500">
已有账户?{" "}
<Link to="/login" className="text-blue-600 hover:underline">
登录
</Link>
</p>
</CardFooter>
</form>
</Card>
</div>
);
}
6.5 受保护的路由
// frontend/src/components/ProtectedRoute.tsx
import { Navigate } from "react-router-dom";
import { useAuth } from "@/hooks/useAuth";
export function ProtectedRoute({ children }: { children: React.ReactNode }) {
const { user, isLoading } = useAuth();
if (isLoading) {
return (
<div className="min-h-screen flex items-center justify-center">
<div className="animate-spin h-8 w-8 border-4 border-blue-600 border-t-transparent rounded-full" />
</div>
);
}
if (!user) {
return <Navigate to="/login" replace />;
}
return <>{children}</>;
}
6.6 路由更新
// frontend/src/App.tsx
// 添加 ProtectedRoute
import { ProtectedRoute } from "@/components/ProtectedRoute";
// 在 Routes 中包裹需要登录的页面
<Route element={<ProtectedRoute><Layout /></ProtectedRoute>}>
<Route path="/dashboard" element={<Dashboard />} />
<Route path="/knowledge-bases/:id" element={<KnowledgeBaseDetail />} />
<Route path="/chat" element={<Chat />} />
</Route>
7. 验证
7.1 测试注册
# 注册
curl -X POST http://localhost:8000/api/auth/register \
-H "Content-Type: application/json" \
-d '{"email":"test@example.com","password":"123456","nickname":"测试用户"}'
# 响应
{"access_token":"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...","token_type":"bearer","user":{"id":"uuid","email":"test@example.com","nickname":"测试用户","avatar_url":""}}
7.2 测试登录
# 登录
curl -X POST http://localhost:8000/api/auth/login \
-H "Content-Type: application/json" \
-d '{"email":"test@example.com","password":"123456"}'
# 使用 Token 访问受保护接口
curl http://localhost:8000/api/auth/me \
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
7.3 测试错误密码
# 错误密码
curl -X POST http://localhost:8000/api/auth/login \
-H "Content-Type: application/json" \
-d '{"email":"test@example.com","password":"wrong"}'
# 响应 401
{"detail":"邮箱或密码错误"}
8. 常见安全问题
| 问题 | 解决方案 |
|---|---|
| 密码明文传输 | HTTPS + 后端接收明文密码(不要前端哈希) |
| Token 被盗 | 设置合理的过期时间(72h),HTTPS 传输 |
| 暴力破解 | 登录失败计数 + 限流(后文实现) |
| SQL 注入 | 使用 ORM(SQLAlchemy)参数化查询 |
| XSS | 前端不要用 dangerouslySetInnerHTML,用安全的 Markdown 渲染 |
关于密码策略:目前只要求 6 位密码。实际产品建议:
- 最少 8 位
- 至少包含字母和数字
- 提供密码强度指示器(前端实时检测)
- 但不强制特殊字符(这只会让用户写密码在便利贴上)
总结
今天完成了完整的用户系统:
| 组件 | 说明 |
|---|---|
| 密码加密 | bcrypt 哈希存储 |
| JWT Token | 签发/验证/过期 |
| 注册接口 | 邮箱唯一性检查 + 密码规范 |
| 登录接口 | 密码验证 + Token 返回 |
| 认证中间件 | 保护需要登录的接口 |
| 前端登录页 | 完整的登录表单 + 错误处理 |
| 前端注册页 | 完整的注册表单 + 密码确认 |
| Auth Context | 全局认证状态管理 |
| 路由保护 | 未登录自动跳转登录页 |
现在用户可以用邮箱注册、登录,前端会保存 Token 并在后续请求中自动带上。
下一篇我们将实现知识库与文档管理——用户可以创建知识库、上传文件。
本文是 《AI 全栈开发实战——做一个真正的产品》 系列的第 3 篇。
系列目录:
- ✅ 产品定义与架构设计
- ✅ 技术选型与项目初始化
- ✅ 用户系统(注册/登录/JWT)← 你在这里
- 📝 知识库与文档管理
- 📝 文档处理 Pipeline
…本文由 Zyentor(智元界) 原创发布
本文发布于 Zyentor(智元界) —— AI 开发者社区
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)