前言

前两篇我们搭建了项目骨架,所有服务都能跑起来了。今天实现第一个完整的功能模块——用户系统

用户系统是几乎所有 Web 应用的基础。我们将实现:

  1. 用户注册(邮箱 + 密码)
  2. 用户登录(JWT Token)
  3. 密码加密与验证
  4. 登录状态检查与 Token 刷新
  5. 完整的 API + 前端页面

1. 密码加密

密码绝对不能明文存储。我们使用 bcrypt 进行哈希加密。

# backend/app/services/auth.py
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def hash_password(password: str) -> str:
    """对密码进行 bcrypt 哈希。"""
    return pwd_context.hash(password)


def verify_password(plain_password: str, hashed_password: str) -> bool:
    """验证密码与哈希是否匹配。"""
    return pwd_context.verify(plain_password, hashed_password)

为什么用 bcrypt?

  • 自动加盐(每个密码的 salt 不同)
  • 可调节计算成本(随着硬件升级可以增加迭代次数)
  • 抗 GPU/ASIC 加速(内存密集型,专用硬件加速效果有限)

绝对不要用:MD5、SHA-1、SHA-256(这些是哈希函数,不是密码哈希函数,没有抗暴力破解设计)。

2. JWT Token

使用 JWT(JSON Web Token)做身份认证。

# backend/app/services/auth.py(续)
from datetime import datetime, timedelta, timezone
from jose import jwt, JWTError
from typing import Optional

from app.config import settings


def create_access_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str:
    """创建 JWT Token。"""
    payload = {
        "sub": user_id,  # subject——用户 ID
        "iat": datetime.now(timezone.utc),  # 签发时间
        "exp": datetime.now(timezone.utc) + (expires_delta or timedelta(hours=settings.JWT_EXPIRATION_HOURS)),  # 过期时间
        "type": "access",
    }
    return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)


def decode_token(token: str) -> Optional[dict]:
    """解码并验证 JWT Token。"""
    try:
        payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])
        return payload
    except JWTError:
        return None


def get_current_user_id(token: str) -> Optional[str]:
    """从 Token 中提取用户 ID。"""
    payload = decode_token(token)
    if payload is None:
        return None
    return payload.get("sub")

3. 注册接口

3.1 Pydantic Schema

# backend/app/schemas/auth.py
from pydantic import BaseModel, EmailStr, Field


class RegisterRequest(BaseModel):
    email: str = Field(..., description="邮箱地址")
    password: str = Field(..., min_length=6, max_length=100, description="密码")
    nickname: str = Field(..., min_length=1, max_length=50, description="昵称")


class LoginRequest(BaseModel):
    email: str = Field(..., description="邮箱地址")
    password: str = Field(..., description="密码")


class TokenResponse(BaseModel):
    access_token: str
    token_type: str = "bearer"
    user: "UserInfo"


class UserInfo(BaseModel):
    id: str
    email: str
    nickname: str
    avatar_url: str

    class Config:
        from_attributes = True


class ErrorResponse(BaseModel):
    detail: str

3.2 注册逻辑

# backend/app/services/user_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.user import User
from app.services.auth import hash_password


class UserService:

    @staticmethod
    async def register(db: AsyncSession, email: str, password: str, nickname: str) -> User:
        """注册新用户。"""
        # 检查邮箱是否已存在
        existing = await db.execute(select(User).where(User.email == email))
        if existing.scalar_one_or_none():
            raise ValueError("该邮箱已被注册")

        # 创建用户
        user = User(
            email=email,
            password_hash=hash_password(password),
            nickname=nickname,
        )
        db.add(user)
        await db.commit()
        await db.refresh(user)
        return user

    @staticmethod
    async def authenticate(db: AsyncSession, email: str, password: str) -> User:
        """验证用户登录。"""
        from app.services.auth import verify_password

        result = await db.execute(select(User).where(User.email == email))
        user = result.scalar_one_or_none()

        if not user:
            raise ValueError("邮箱或密码错误")

        if not verify_password(password, user.password_hash):
            raise ValueError("邮箱或密码错误")

        if not user.is_active:
            raise ValueError("账户已被禁用")

        return user

    @staticmethod
    async def get_by_id(db: AsyncSession, user_id: str) -> User:
        """通过 ID 获取用户。"""
        from uuid import UUID
        result = await db.execute(select(User).where(User.id == UUID(user_id)))
        user = result.scalar_one_or_none()
        if not user:
            raise ValueError("用户不存在")
        return user

3.3 注册路由

# backend/app/routers/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.schemas.auth import (
    RegisterRequest, LoginRequest,
    TokenResponse, UserInfo, ErrorResponse,
)
from app.services.user_service import UserService
from app.services.auth import create_access_token

router = APIRouter()


@router.post("/register", response_model=TokenResponse, status_code=status.HTTP_201_CREATED)
async def register(body: RegisterRequest, db: AsyncSession = Depends(get_db)):
    """用户注册。"""
    try:
        user = await UserService.register(
            db, email=body.email, password=body.password, nickname=body.nickname
        )
    except ValueError as e:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))

    token = create_access_token(str(user.id))
    return TokenResponse(
        access_token=token,
        user=UserInfo(
            id=str(user.id),
            email=user.email,
            nickname=user.nickname,
            avatar_url=user.avatar_url or "",
        ),
    )


@router.post("/login", response_model=TokenResponse)
async def login(body: LoginRequest, db: AsyncSession = Depends(get_db)):
    """用户登录。"""
    try:
        user = await UserService.authenticate(db, email=body.email, password=body.password)
    except ValueError as e:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(e))

    token = create_access_token(str(user.id))
    return TokenResponse(
        access_token=token,
        user=UserInfo(
            id=str(user.id),
            email=user.email,
            nickname=user.nickname,
            avatar_url=user.avatar_url or "",
        ),
    )

⚠️ 安全细节

  • 登录失败时返回 "邮箱或密码错误",不告诉用户到底是邮箱不存在还是密码错误——防止攻击者枚举邮箱
  • 密码传输通过 HTTPS 加密,不在前端做任何哈希(前端哈希等于明文,因为攻击者可以直接调用 API)

4. 认证中间件

需要登录的接口通过依赖注入验证 Token:

# backend/app/services/auth.py(续)
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()


async def require_auth(
    credentials: HTTPAuthorizationCredentials = Depends(security),
    db: AsyncSession = Depends(get_db),
) -> User:
    """需要登录的接口依赖。"""
    token = credentials.credentials
    user_id = get_current_user_id(token)

    if user_id is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Token 无效或已过期",
        )

    try:
        user = await UserService.get_by_id(db, user_id)
    except ValueError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户不存在",
        )

    return user


@router.get("/me", response_model=UserInfo)
async def get_me(user: User = Depends(require_auth)):
    """获取当前用户信息。"""
    return UserInfo(
        id=str(user.id),
        email=user.email,
        nickname=user.nickname,
        avatar_url=user.avatar_url or "",
    )

5. 注册路由到应用

# backend/app/main.py(更新)
from app.routers import auth

app.include_router(auth.router, prefix="/api/auth", tags=["Auth"])

# 现在 protected 路由也可以使用了
from app.services.auth import require_auth

6. 前端登录/注册页面

6.1 API 层

// frontend/src/api/auth.ts
import api from "@/lib/api";

export interface UserInfo {
  id: string;
  email: string;
  nickname: string;
  avatar_url: string;
}

export interface TokenResponse {
  access_token: string;
  token_type: string;
  user: UserInfo;
}

export async function register(
  email: string,
  password: string,
  nickname: string
): Promise<TokenResponse> {
  const { data } = await api.post("/auth/register", {
    email,
    password,
    nickname,
  });
  return data;
}

export async function login(
  email: string,
  password: string
): Promise<TokenResponse> {
  const { data } = await api.post("/auth/login", { email, password });
  return data;
}

export async function getMe(): Promise<UserInfo> {
  const { data } = await api.get("/auth/me");
  return data;
}

6.2 Auth Context

// frontend/src/hooks/useAuth.tsx
import { createContext, useContext, useState, useEffect, ReactNode } from "react";
import { UserInfo, login as apiLogin, register as apiRegister, getMe } from "@/api/auth";

interface AuthContextType {
  user: UserInfo | null;
  token: string | null;
  isLoading: boolean;
  login: (email: string, password: string) => Promise<void>;
  register: (email: string, password: string, nickname: string) => Promise<void>;
  logout: () => void;
}

const AuthContext = createContext<AuthContextType | null>(null);

export function AuthProvider({ children }: { children: ReactNode }) {
  const [user, setUser] = useState<UserInfo | null>(null);
  const [token, setToken] = useState<string | null>(() =>
    localStorage.getItem("token")
  );
  const [isLoading, setIsLoading] = useState(true);

  useEffect(() => {
    if (token) {
      getMe()
        .then(setUser)
        .catch(() => {
          localStorage.removeItem("token");
          setToken(null);
        })
        .finally(() => setIsLoading(false));
    } else {
      setIsLoading(false);
    }
  }, [token]);

  const login = async (email: string, password: string) => {
    const res = await apiLogin(email, password);
    localStorage.setItem("token", res.access_token);
    setToken(res.access_token);
    setUser(res.user);
  };

  const register = async (email: string, password: string, nickname: string) => {
    const res = await apiRegister(email, password, nickname);
    localStorage.setItem("token", res.access_token);
    setToken(res.access_token);
    setUser(res.user);
  };

  const logout = () => {
    localStorage.removeItem("token");
    setToken(null);
    setUser(null);
  };

  return (
    <AuthContext.Provider value={{ user, token, isLoading, login, register, logout }}>
      {children}
    </AuthContext.Provider>
  );
}

export function useAuth() {
  const ctx = useContext(AuthContext);
  if (!ctx) throw new Error("useAuth must be used within AuthProvider");
  return ctx;
}

6.3 登录页面

// frontend/src/pages/Login.tsx
import { useState } from "react";
import { useNavigate, Link } from "react-router-dom";
import { useAuth } from "@/hooks/useAuth";
import { Button } from "@/components/ui/button";
import { Input } from "@/components/ui/input";
import { Card, CardContent, CardDescription, CardFooter, CardHeader, CardTitle } from "@/components/ui/card";

export default function Login() {
  const [email, setEmail] = useState("");
  const [password, setPassword] = useState("");
  const [error, setError] = useState("");
  const [loading, setLoading] = useState(false);
  const { login } = useAuth();
  const navigate = useNavigate();

  const handleSubmit = async (e: React.FormEvent) => {
    e.preventDefault();
    setError("");
    setLoading(true);

    try {
      await login(email, password);
      navigate("/dashboard");
    } catch (err: any) {
      setError(err.response?.data?.detail || "登录失败");
    } finally {
      setLoading(false);
    }
  };

  return (
    <div className="min-h-screen flex items-center justify-center bg-gray-50 px-4">
      <Card className="w-full max-w-md">
        <CardHeader className="text-center">
          <CardTitle className="text-2xl">登录 KNow</CardTitle>
          <CardDescription>登录你的知识库账户</CardDescription>
        </CardHeader>
        <form onSubmit={handleSubmit}>
          <CardContent className="space-y-4">
            {error && (
              <div className="bg-red-50 text-red-600 text-sm px-3 py-2 rounded-md">
                {error}
              </div>
            )}
            <div>
              <label className="text-sm font-medium mb-1 block">邮箱</label>
              <Input
                type="email"
                placeholder="your@email.com"
                value={email}
                onChange={(e) => setEmail(e.target.value)}
                required
              />
            </div>
            <div>
              <label className="text-sm font-medium mb-1 block">密码</label>
              <Input
                type="password"
                placeholder="••••••••"
                value={password}
                onChange={(e) => setPassword(e.target.value)}
                required
              />
            </div>
          </CardContent>
          <CardFooter className="flex flex-col space-y-2">
            <Button type="submit" className="w-full" disabled={loading}>
              {loading ? "登录中..." : "登录"}
            </Button>
            <p className="text-sm text-gray-500">
              还没有账户?{" "}
              <Link to="/register" className="text-blue-600 hover:underline">
                注册
              </Link>
            </p>
          </CardFooter>
        </form>
      </Card>
    </div>
  );
}

6.4 注册页面

// frontend/src/pages/Register.tsx
import { useState } from "react";
import { useNavigate, Link } from "react-router-dom";
import { useAuth } from "@/hooks/useAuth";
import { Button } from "@/components/ui/button";
import { Input } from "@/components/ui/input";
import { Card, CardContent, CardDescription, CardFooter, CardHeader, CardTitle } from "@/components/ui/card";

export default function Register() {
  const [nickname, setNickname] = useState("");
  const [email, setEmail] = useState("");
  const [password, setPassword] = useState("");
  const [confirmPassword, setConfirmPassword] = useState("");
  const [error, setError] = useState("");
  const [loading, setLoading] = useState(false);
  const { register } = useAuth();
  const navigate = useNavigate();

  const handleSubmit = async (e: React.FormEvent) => {
    e.preventDefault();
    setError("");

    if (password !== confirmPassword) {
      setError("两次密码不一致");
      return;
    }
    if (password.length < 6) {
      setError("密码至少 6 位");
      return;
    }

    setLoading(true);
    try {
      await register(email, password, nickname);
      navigate("/dashboard");
    } catch (err: any) {
      setError(err.response?.data?.detail || "注册失败");
    } finally {
      setLoading(false);
    }
  };

  return (
    <div className="min-h-screen flex items-center justify-center bg-gray-50 px-4">
      <Card className="w-full max-w-md">
        <CardHeader className="text-center">
          <CardTitle className="text-2xl">注册 KNow</CardTitle>
          <CardDescription>创建你的知识库账户</CardDescription>
        </CardHeader>
        <form onSubmit={handleSubmit}>
          <CardContent className="space-y-4">
            {error && (
              <div className="bg-red-50 text-red-600 text-sm px-3 py-2 rounded-md">
                {error}
              </div>
            )}
            <div>
              <label className="text-sm font-medium mb-1 block">昵称</label>
              <Input
                placeholder="你的名字"
                value={nickname}
                onChange={(e) => setNickname(e.target.value)}
                required
              />
            </div>
            <div>
              <label className="text-sm font-medium mb-1 block">邮箱</label>
              <Input
                type="email"
                placeholder="your@email.com"
                value={email}
                onChange={(e) => setEmail(e.target.value)}
                required
              />
            </div>
            <div>
              <label className="text-sm font-medium mb-1 block">密码</label>
              <Input
                type="password"
                placeholder="至少 6 位"
                value={password}
                onChange={(e) => setPassword(e.target.value)}
                required
              />
            </div>
            <div>
              <label className="text-sm font-medium mb-1 block">确认密码</label>
              <Input
                type="password"
                placeholder="再次输入密码"
                value={confirmPassword}
                onChange={(e) => setConfirmPassword(e.target.value)}
                required
              />
            </div>
          </CardContent>
          <CardFooter className="flex flex-col space-y-2">
            <Button type="submit" className="w-full" disabled={loading}>
              {loading ? "注册中..." : "注册"}
            </Button>
            <p className="text-sm text-gray-500">
              已有账户?{" "}
              <Link to="/login" className="text-blue-600 hover:underline">
                登录
              </Link>
            </p>
          </CardFooter>
        </form>
      </Card>
    </div>
  );
}

6.5 受保护的路由

// frontend/src/components/ProtectedRoute.tsx
import { Navigate } from "react-router-dom";
import { useAuth } from "@/hooks/useAuth";

export function ProtectedRoute({ children }: { children: React.ReactNode }) {
  const { user, isLoading } = useAuth();

  if (isLoading) {
    return (
      <div className="min-h-screen flex items-center justify-center">
        <div className="animate-spin h-8 w-8 border-4 border-blue-600 border-t-transparent rounded-full" />
      </div>
    );
  }

  if (!user) {
    return <Navigate to="/login" replace />;
  }

  return <>{children}</>;
}

6.6 路由更新

// frontend/src/App.tsx
// 添加 ProtectedRoute
import { ProtectedRoute } from "@/components/ProtectedRoute";

// 在 Routes 中包裹需要登录的页面
<Route element={<ProtectedRoute><Layout /></ProtectedRoute>}>
  <Route path="/dashboard" element={<Dashboard />} />
  <Route path="/knowledge-bases/:id" element={<KnowledgeBaseDetail />} />
  <Route path="/chat" element={<Chat />} />
</Route>

7. 验证

7.1 测试注册

# 注册
curl -X POST http://localhost:8000/api/auth/register \
  -H "Content-Type: application/json" \
  -d '{"email":"test@example.com","password":"123456","nickname":"测试用户"}'

# 响应
{"access_token":"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...","token_type":"bearer","user":{"id":"uuid","email":"test@example.com","nickname":"测试用户","avatar_url":""}}

7.2 测试登录

# 登录
curl -X POST http://localhost:8000/api/auth/login \
  -H "Content-Type: application/json" \
  -d '{"email":"test@example.com","password":"123456"}'

# 使用 Token 访问受保护接口
curl http://localhost:8000/api/auth/me \
  -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."

7.3 测试错误密码

# 错误密码
curl -X POST http://localhost:8000/api/auth/login \
  -H "Content-Type: application/json" \
  -d '{"email":"test@example.com","password":"wrong"}'

# 响应 401
{"detail":"邮箱或密码错误"}

8. 常见安全问题

问题 解决方案
密码明文传输 HTTPS + 后端接收明文密码(不要前端哈希)
Token 被盗 设置合理的过期时间(72h),HTTPS 传输
暴力破解 登录失败计数 + 限流(后文实现)
SQL 注入 使用 ORM(SQLAlchemy)参数化查询
XSS 前端不要用 dangerouslySetInnerHTML,用安全的 Markdown 渲染

关于密码策略:目前只要求 6 位密码。实际产品建议:

  • 最少 8 位
  • 至少包含字母和数字
  • 提供密码强度指示器(前端实时检测)
  • 但不强制特殊字符(这只会让用户写密码在便利贴上)

总结

今天完成了完整的用户系统:

组件 说明
密码加密 bcrypt 哈希存储
JWT Token 签发/验证/过期
注册接口 邮箱唯一性检查 + 密码规范
登录接口 密码验证 + Token 返回
认证中间件 保护需要登录的接口
前端登录页 完整的登录表单 + 错误处理
前端注册页 完整的注册表单 + 密码确认
Auth Context 全局认证状态管理
路由保护 未登录自动跳转登录页

现在用户可以用邮箱注册、登录,前端会保存 Token 并在后续请求中自动带上。

下一篇我们将实现知识库与文档管理——用户可以创建知识库、上传文件。


本文是 《AI 全栈开发实战——做一个真正的产品》 系列的第 3 篇。
系列目录:

  1. ✅ 产品定义与架构设计
  2. ✅ 技术选型与项目初始化
  3. ✅ 用户系统(注册/登录/JWT)← 你在这里
  4. 📝 知识库与文档管理
  5. 📝 文档处理 Pipeline

本文由 Zyentor(智元界) 原创发布


本文发布于 Zyentor(智元界) —— AI 开发者社区

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐