环境声明

  • Python 版本Python 3.12+
  • FastAPI 版本FastAPI 0.115+
  • SQLAlchemy 版本SQLAlchemy 2.0+
  • Pydantic 版本Pydantic 2.0+
  • 数据库PostgreSQL 16+
  • 缓存Redis 7+
  • 前端Vue.js 3.xReact 18+
  • 部署Docker 24+Kubernetes 1.28+

学习目标

学完本讲,你将能够:

  1. 进行完整的需求分析和系统架构设计
  2. 设计合理的数据库模型和关系
  3. 开发 RESTful API 接口并实现业务逻辑
  4. 实现用户认证、权限管理和内容管理
  5. 集成前端界面并完成前后端联调
  6. 完成生产环境的容器化部署

1. 项目概述与架构设计

1.1 需求分析

功能需求

模块 功能 优先级
用户系统 注册、登录、JWT认证、个人中心
文章管理 发布、编辑、删除、草稿箱
分类标签 文章分类、标签管理
评论系统 评论、回复、审核
搜索功能 全文搜索、筛选排序
文件管理 图片上传、资源管理
后台管理 数据统计、内容审核
SEO优化 站点地图、Meta标签

非功能需求

  • 支持 1000+ 并发用户
  • API 响应时间 < 200ms
  • 系统可用性 99.9%
  • 支持水平扩展

1.2 系统架构

┌─────────────────────────────────────────────────────────────────┐
│                           客户端层                               │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐  │
│  │   Web App   │  │  Mobile Web │  │      Admin Panel        │  │
│  │  (Vue.js)   │  │  (Responsive)│  │      (Element Plus)     │  │
│  └──────┬──────┘  └──────┬──────┘  └───────────┬─────────────┘  │
└─────────┼────────────────┼─────────────────────┼────────────────┘
          │                │                     │
          └────────────────┴─────────────────────┘
                           │
┌──────────────────────────┼──────────────────────────────────────┐
│                      CDN / Nginx (静态资源/负载均衡)              │
└──────────────────────────┼──────────────────────────────────────┘
                           │
┌──────────────────────────┼──────────────────────────────────────┐
│                      API Gateway (Kong/Nginx)                    │
│              限流、认证、路由、日志                               │
└──────────────────────────┼──────────────────────────────────────┘
                           │
┌──────────────────────────┼──────────────────────────────────────┐
│                      应用服务层 (FastAPI)                        │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐  │
│  │   用户服务   │  │   文章服务   │  │      评论服务            │  │
│  └─────────────┘  └─────────────┘  └─────────────────────────┘  │
└──────────────────────────┼──────────────────────────────────────┘
                           │
┌──────────────────────────┼──────────────────────────────────────┐
│                      数据层                                      │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐  │
│  │  PostgreSQL │  │    Redis    │  │   Elasticsearch         │  │
│  │  (主数据库)  │  │  (缓存/会话) │  │   (全文搜索)            │  │
│  └─────────────┘  └─────────────┘  └─────────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘

1.3 技术选型

层级 技术 说明
后端框架 FastAPI 高性能异步框架
数据库 ORM SQLAlchemy 2.0 异步数据库操作
缓存 Redis 会话、缓存、限流
搜索 Elasticsearch 全文搜索
消息队列 Redis/RabbitMQ 异步任务
任务调度 Celery 定时任务、后台处理
对象存储 MinIO/AWS S3 文件存储
监控 Prometheus + Grafana 指标监控
日志 ELK Stack 日志收集分析

2. 项目结构与初始化

2.1 目录结构

enterprise-blog/
├── app/
│   ├── __init__.py
│   ├── main.py                 # 应用入口
│   ├── config.py               # 配置管理
│   ├── database.py             # 数据库连接
│   ├── cache.py                # 缓存配置
│   ├── models/                 # 数据模型
│   │   ├── __init__.py
│   │   ├── user.py
│   │   ├── post.py
│   │   ├── category.py
│   │   ├── tag.py
│   │   └── comment.py
│   ├── schemas/                # Pydantic 模型
│   │   ├── __init__.py
│   │   ├── user.py
│   │   ├── post.py
│   │   └── common.py
│   ├── api/                    # API 路由
│   │   ├── __init__.py
│   │   ├── deps.py             # 依赖注入
│   │   ├── v1/
│   │   │   ├── __init__.py
│   │   │   ├── auth.py
│   │   │   ├── users.py
│   │   │   ├── posts.py
│   │   │   ├── categories.py
│   │   │   ├── tags.py
│   │   │   └── comments.py
│   ├── services/               # 业务逻辑层
│   │   ├── __init__.py
│   │   ├── user_service.py
│   │   ├── post_service.py
│   │   └── search_service.py
│   ├── core/                   # 核心功能
│   │   ├── __init__.py
│   │   ├── security.py         # 安全相关
│   │   ├── exceptions.py       # 自定义异常
│   │   └── middleware.py       # 中间件
│   └── utils/                  # 工具函数
│       ├── __init__.py
│       ├── pagination.py
│       └── validators.py
├── alembic/                    # 数据库迁移
├── tests/                      # 测试代码
├── docker/                     # Docker 配置
├── k8s/                        # Kubernetes 配置
├── scripts/                    # 脚本文件
├── requirements.txt
├── requirements-dev.txt
├── Dockerfile
├── docker-compose.yml
└── README.md

2.2 配置文件

# app/config.py
"""
应用配置管理
"""

from pydantic_settings import BaseSettings
from functools import lru_cache
from typing import List


class Settings(BaseSettings):
    """应用配置"""
    
    # 应用信息
    APP_NAME: str = "Enterprise Blog"
    APP_VERSION: str = "1.0.0"
    DEBUG: bool = False
    
    # 安全配置
    SECRET_KEY: str
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
    REFRESH_TOKEN_EXPIRE_DAYS: int = 7
    ALGORITHM: str = "HS256"
    
    # 数据库配置
    DATABASE_URL: str = "postgresql+asyncpg://user:pass@localhost/blog"
    DATABASE_POOL_SIZE: int = 20
    DATABASE_MAX_OVERFLOW: int = 10
    
    # Redis 配置
    REDIS_URL: str = "redis://localhost:6379/0"
    REDIS_POOL_SIZE: int = 50
    
    # Elasticsearch 配置
    ELASTICSEARCH_URL: str = "http://localhost:9200"
    
    # 文件存储配置
    MINIO_ENDPOINT: str = "localhost:9000"
    MINIO_ACCESS_KEY: str = ""
    MINIO_SECRET_KEY: str = ""
    MINIO_BUCKET_NAME: str = "blog-media"
    
    # 邮件配置
    SMTP_HOST: str = ""
    SMTP_PORT: int = 587
    SMTP_USER: str = ""
    SMTP_PASSWORD: str = ""
    
    # CORS 配置
    CORS_ORIGINS: List[str] = ["http://localhost:3000", "http://localhost:5173"]
    
    # 分页配置
    DEFAULT_PAGE_SIZE: int = 20
    MAX_PAGE_SIZE: int = 100
    
    # 缓存配置
    CACHE_DEFAULT_TIMEOUT: int = 300  # 5分钟
    
    class Config:
        env_file = ".env"
        case_sensitive = True


@lru_cache()
def get_settings() -> Settings:
    """获取配置实例(缓存)"""
    return Settings()


settings = get_settings()

2.3 数据库配置

# app/database.py
"""
数据库连接配置
"""

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
from sqlalchemy.pool import NullPool
from app.config import settings

# 创建异步引擎
engine = create_async_engine(
    settings.DATABASE_URL,
    pool_size=settings.DATABASE_POOL_SIZE,
    max_overflow=settings.DATABASE_MAX_OVERFLOW,
    pool_pre_ping=True,
    pool_recycle=3600,
    echo=settings.DEBUG,
    future=True
)

# 创建会话工厂
AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
    autoflush=False
)

# 声明基类
Base = declarative_base()


async def get_db():
    """获取数据库会话(依赖注入)"""
    async with AsyncSessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()


async def init_db():
    """初始化数据库(创建表)"""
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

3. 数据模型设计

3.1 用户模型

# app/models/user.py
"""
用户模型
"""

from sqlalchemy import Column, Integer, String, Boolean, DateTime, Text, Enum
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.database import Base
import enum


class UserRole(str, enum.Enum):
    """用户角色"""
    READER = "reader"      # 读者
    AUTHOR = "author"      # 作者
    EDITOR = "editor"      # 编辑
    ADMIN = "admin"        # 管理员


class UserStatus(str, enum.Enum):
    """用户状态"""
    ACTIVE = "active"
    INACTIVE = "inactive"
    SUSPENDED = "suspended"


class User(Base):
    """用户表"""
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String(50), unique=True, index=True, nullable=False)
    email = Column(String(100), unique=True, index=True, nullable=False)
    hashed_password = Column(String(255), nullable=False)
    
    # 个人信息
    nickname = Column(String(50), nullable=True)
    avatar = Column(String(255), nullable=True)
    bio = Column(Text, nullable=True)
    website = Column(String(255), nullable=True)
    
    # 角色和状态
    role = Column(Enum(UserRole), default=UserRole.READER, nullable=False)
    status = Column(Enum(UserStatus), default=UserStatus.ACTIVE, nullable=False)
    
    # 统计信息
    post_count = Column(Integer, default=0)
    comment_count = Column(Integer, default=0)
    
    # 时间戳
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())
    last_login = Column(DateTime(timezone=True), nullable=True)
    
    # 软删除
    is_deleted = Column(Boolean, default=False)
    deleted_at = Column(DateTime(timezone=True), nullable=True)
    
    # 关系
    posts = relationship("Post", back_populates="author", lazy="selectin")
    comments = relationship("Comment", back_populates="author", lazy="selectin")
    
    def __repr__(self):
        return f"<User {self.username}>"

3.2 文章模型

# app/models/post.py
"""
文章模型
"""

from sqlalchemy import Column, Integer, String, Text, Boolean, DateTime, ForeignKey, Table
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.database import Base
import enum


class PostStatus(str, enum.Enum):
    """文章状态"""
    DRAFT = "draft"           # 草稿
    PUBLISHED = "published"   # 已发布
    ARCHIVED = "archived"     # 已归档


# 文章-标签关联表
post_tags = Table(
    "post_tags",
    Base.metadata,
    Column("post_id", Integer, ForeignKey("posts.id", ondelete="CASCADE"), primary_key=True),
    Column("tag_id", Integer, ForeignKey("tags.id", ondelete="CASCADE"), primary_key=True)
)


class Post(Base):
    """文章表"""
    __tablename__ = "posts"
    
    id = Column(Integer, primary_key=True, index=True)
    
    # 基本信息
    title = Column(String(200), nullable=False)
    slug = Column(String(200), unique=True, index=True, nullable=False)
    summary = Column(String(500), nullable=True)
    content = Column(Text, nullable=False)
    content_html = Column(Text, nullable=True)  # 渲染后的 HTML
    
    # 封面图
    cover_image = Column(String(255), nullable=True)
    
    # 状态和元数据
    status = Column(String(20), default=PostStatus.DRAFT, nullable=False)
    is_top = Column(Boolean, default=False)  # 是否置顶
    allow_comment = Column(Boolean, default=True)
    
    # 统计信息
    view_count = Column(Integer, default=0)
    like_count = Column(Integer, default=0)
    comment_count = Column(Integer, default=0)
    
    # 外键
    author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    category_id = Column(Integer, ForeignKey("categories.id"), nullable=True)
    
    # 时间戳
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())
    published_at = Column(DateTime(timezone=True), nullable=True)
    
    # 软删除
    is_deleted = Column(Boolean, default=False)
    
    # 关系
    author = relationship("User", back_populates="posts")
    category = relationship("Category", back_populates="posts")
    tags = relationship("Tag", secondary=post_tags, back_populates="posts")
    comments = relationship("Comment", back_populates="post", lazy="selectin")
    
    def __repr__(self):
        return f"<Post {self.title}>"

3.3 分类和标签模型

# app/models/category.py
"""
分类模型
"""

from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.database import Base


class Category(Base):
    """分类表"""
    __tablename__ = "categories"
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50), unique=True, nullable=False)
    slug = Column(String(50), unique=True, index=True, nullable=False)
    description = Column(String(200), nullable=True)
    
    # 层级结构
    parent_id = Column(Integer, ForeignKey("categories.id"), nullable=True)
    sort_order = Column(Integer, default=0)
    
    # 统计
    post_count = Column(Integer, default=0)
    
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    
    # 关系
    posts = relationship("Post", back_populates="category")
    children = relationship("Category", backref="parent", remote_side=[id])
    
    def __repr__(self):
        return f"<Category {self.name}>"


# app/models/tag.py
"""
标签模型
"""

from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.database import Base


class Tag(Base):
    """标签表"""
    __tablename__ = "tags"
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(30), unique=True, nullable=False)
    slug = Column(String(30), unique=True, index=True, nullable=False)
    
    # 统计
    post_count = Column(Integer, default=0)
    
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    
    # 关系
    posts = relationship("Post", secondary="post_tags", back_populates="tags")
    
    def __repr__(self):
        return f"<Tag {self.name}>"

3.4 评论模型

# app/models/comment.py
"""
评论模型
"""

from sqlalchemy import Column, Integer, String, Text, Boolean, DateTime, ForeignKey
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.database import Base
import enum


class CommentStatus(str, enum.Enum):
    """评论状态"""
    PENDING = "pending"      # 待审核
    APPROVED = "approved"    # 已通过
    REJECTED = "rejected"    # 已拒绝


class Comment(Base):
    """评论表"""
    __tablename__ = "comments"
    
    id = Column(Integer, primary_key=True, index=True)
    
    # 内容
    content = Column(Text, nullable=False)
    content_html = Column(Text, nullable=True)
    
    # 状态
    status = Column(String(20), default=CommentStatus.APPROVED, nullable=False)
    
    # 外键
    post_id = Column(Integer, ForeignKey("posts.id", ondelete="CASCADE"), nullable=False)
    author_id = Column(Integer, ForeignKey("users.id"), nullable=True)  # 游客可为空
    parent_id = Column(Integer, ForeignKey("comments.id", ondelete="CASCADE"), nullable=True)
    
    # 游客信息(未登录用户)
    guest_name = Column(String(50), nullable=True)
    guest_email = Column(String(100), nullable=True)
    guest_website = Column(String(255), nullable=True)
    
    # 点赞数
    like_count = Column(Integer, default=0)
    
    # 时间戳
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())
    
    # 关系
    post = relationship("Post", back_populates="comments")
    author = relationship("User", back_populates="comments")
    parent = relationship("Comment", remote_side="Comment.id", backref="replies")
    
    def __repr__(self):
        return f"<Comment {self.id}>"

4. Pydantic 模型定义

4.1 基础模型

# app/schemas/common.py
"""
通用 Pydantic 模型
"""

from pydantic import BaseModel, ConfigDict
from typing import Generic, TypeVar, List, Optional
from datetime import datetime

T = TypeVar('T')


class ResponseBase(BaseModel):
    """基础响应模型"""
    model_config = ConfigDict(from_attributes=True)


class PaginationParams(BaseModel):
    """分页参数"""
    page: int = 1
    page_size: int = 20
    
    @property
    def offset(self) -> int:
        return (self.page - 1) * self.page_size


class PaginatedResponse(ResponseBase, Generic[T]):
    """分页响应模型"""
    items: List[T]
    total: int
    page: int
    page_size: int
    pages: int
    
    @property
    def has_next(self) -> bool:
        return self.page < self.pages
    
    @property
    def has_prev(self) -> bool:
        return self.page > 1


class TimestampMixin(ResponseBase):
    """时间戳混入"""
    created_at: datetime
    updated_at: Optional[datetime] = None

4.2 用户模型

# app/schemas/user.py
"""
用户 Pydantic 模型
"""

from pydantic import BaseModel, EmailStr, Field, ConfigDict
from typing import Optional, List
from datetime import datetime
from app.models.user import UserRole, UserStatus
from app.schemas.common import ResponseBase, TimestampMixin


# ========== 请求模型 ==========

class UserCreate(BaseModel):
    """用户注册请求"""
    username: str = Field(..., min_length=3, max_length=50)
    email: EmailStr
    password: str = Field(..., min_length=8, max_length=100)
    nickname: Optional[str] = Field(None, max_length=50)


class UserLogin(BaseModel):
    """用户登录请求"""
    username: str
    password: str


class UserUpdate(BaseModel):
    """用户更新请求"""
    nickname: Optional[str] = Field(None, max_length=50)
    bio: Optional[str] = Field(None, max_length=500)
    website: Optional[str] = Field(None, max_length=255)
    avatar: Optional[str] = None


class PasswordChange(BaseModel):
    """密码修改请求"""
    old_password: str
    new_password: str = Field(..., min_length=8, max_length=100)


# ========== 响应模型 ==========

class UserBase(ResponseBase):
    """用户基础信息"""
    id: int
    username: str
    nickname: Optional[str] = None
    avatar: Optional[str] = None


class UserResponse(UserBase, TimestampMixin):
    """用户详细信息响应"""
    email: EmailStr
    bio: Optional[str] = None
    website: Optional[str] = None
    role: UserRole
    post_count: int = 0
    comment_count: int = 0
    last_login: Optional[datetime] = None


class UserProfile(UserBase):
    """用户公开资料"""
    bio: Optional[str] = None
    website: Optional[str] = None
    post_count: int = 0
    created_at: datetime


class TokenResponse(BaseModel):
    """Token 响应"""
    access_token: str
    refresh_token: str
    token_type: str = "bearer"
    expires_in: int

4.3 文章模型

# app/schemas/post.py
"""
文章 Pydantic 模型
"""

from pydantic import BaseModel, Field, ConfigDict
from typing import Optional, List
from datetime import datetime
from app.models.post import PostStatus
from app.schemas.common import ResponseBase, TimestampMixin
from app.schemas.user import UserBase


# ========== 请求模型 ==========

class PostCreate(BaseModel):
    """创建文章请求"""
    title: str = Field(..., min_length=1, max_length=200)
    content: str = Field(..., min_length=1)
    summary: Optional[str] = Field(None, max_length=500)
    cover_image: Optional[str] = None
    category_id: Optional[int] = None
    tag_ids: List[int] = []
    status: PostStatus = PostStatus.DRAFT
    is_top: bool = False
    allow_comment: bool = True


class PostUpdate(BaseModel):
    """更新文章请求"""
    title: Optional[str] = Field(None, min_length=1, max_length=200)
    content: Optional[str] = None
    summary: Optional[str] = Field(None, max_length=500)
    cover_image: Optional[str] = None
    category_id: Optional[int] = None
    tag_ids: Optional[List[int]] = None
    status: Optional[PostStatus] = None
    is_top: Optional[bool] = None
    allow_comment: Optional[bool] = None


class PostListFilter(BaseModel):
    """文章列表筛选"""
    category_id: Optional[int] = None
    tag_id: Optional[int] = None
    author_id: Optional[int] = None
    status: Optional[PostStatus] = PostStatus.PUBLISHED
    keyword: Optional[str] = None


# ========== 响应模型 ==========

class CategoryBrief(ResponseBase):
    """分类简要信息"""
    id: int
    name: str
    slug: str


class TagBrief(ResponseBase):
    """标签简要信息"""
    id: int
    name: str
    slug: str


class PostBrief(ResponseBase, TimestampMixin):
    """文章列表项"""
    id: int
    title: str
    slug: str
    summary: Optional[str] = None
    cover_image: Optional[str] = None
    status: PostStatus
    is_top: bool
    view_count: int
    like_count: int
    comment_count: int
    author: UserBase
    category: Optional[CategoryBrief] = None


class PostDetail(PostBrief):
    """文章详情"""
    content: str
    content_html: Optional[str] = None
    allow_comment: bool
    published_at: Optional[datetime] = None
    tags: List[TagBrief] = []


class PostArchive(ResponseBase):
    """文章归档"""
    year: int
    month: int
    count: int

5. API 接口开发

5.1 认证依赖

# app/api/deps.py
"""
API 依赖注入
"""

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from jose import JWTError, jwt
from app.database import get_db
from app.config import settings
from app.models.user import User, UserRole
from app.services.user_service import UserService

# 安全方案
security = HTTPBearer()


async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security),
    db: AsyncSession = Depends(get_db)
) -> User:
    """获取当前登录用户"""
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="无效的认证凭据",
        headers={"WWW-Authenticate": "Bearer"},
    )
    
    try:
        payload = jwt.decode(
            credentials.credentials,
            settings.SECRET_KEY,
            algorithms=[settings.ALGORITHM]
        )
        user_id: str = payload.get("sub")
        if user_id is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    
    user_service = UserService(db)
    user = await user_service.get_by_id(int(user_id))
    
    if user is None or user.status != "active":
        raise credentials_exception
    
    return user


async def get_current_active_user(
    current_user: User = Depends(get_current_user)
) -> User:
    """获取当前活跃用户"""
    if current_user.is_deleted:
        raise HTTPException(status_code=400, detail="用户已被删除")
    return current_user


class RoleChecker:
    """角色检查器"""
    
    def __init__(self, allowed_roles: list):
        self.allowed_roles = allowed_roles
    
    def __call__(self, user: User = Depends(get_current_active_user)):
        if user.role not in self.allowed_roles:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="权限不足"
            )
        return user


# 常用角色检查依赖
require_admin = RoleChecker([UserRole.ADMIN])
require_editor = RoleChecker([UserRole.ADMIN, UserRole.EDITOR])
require_author = RoleChecker([UserRole.ADMIN, UserRole.EDITOR, UserRole.AUTHOR])

5.2 认证接口

# app/api/v1/auth.py
"""
认证相关接口
"""

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import timedelta
from app.database import get_db
from app.config import settings
from app.schemas.user import UserCreate, UserResponse, TokenResponse, UserLogin
from app.services.user_service import UserService
from app.core.security import create_access_token, create_refresh_token, verify_password

router = APIRouter(prefix="/auth", tags=["认证"])


@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def register(
    user_data: UserCreate,
    db: AsyncSession = Depends(get_db)
):
    """用户注册"""
    user_service = UserService(db)
    
    # 检查用户名是否已存在
    if await user_service.get_by_username(user_data.username):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="用户名已存在"
        )
    
    # 检查邮箱是否已存在
    if await user_service.get_by_email(user_data.email):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="邮箱已被注册"
        )
    
    # 创建用户
    user = await user_service.create(user_data)
    return user


@router.post("/login", response_model=TokenResponse)
async def login(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: AsyncSession = Depends(get_db)
):
    """用户登录"""
    user_service = UserService(db)
    
    # 验证用户
    user = await user_service.get_by_username(form_data.username)
    if not user or not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户名或密码错误",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    if user.status != "active":
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="账户已被禁用"
        )
    
    # 更新最后登录时间
    await user_service.update_last_login(user.id)
    
    # 生成 Token
    access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": str(user.id), "role": user.role},
        expires_delta=access_token_expires
    )
    
    refresh_token_expires = timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
    refresh_token = create_refresh_token(
        data={"sub": str(user.id)},
        expires_delta=refresh_token_expires
    )
    
    return TokenResponse(
        access_token=access_token,
        refresh_token=refresh_token,
        expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
    )


@router.post("/refresh", response_model=TokenResponse)
async def refresh_token(
    refresh_token: str,
    db: AsyncSession = Depends(get_db)
):
    """刷新访问令牌"""
    from jose import jwt
    
    try:
        payload = jwt.decode(
            refresh_token,
            settings.SECRET_KEY,
            algorithms=[settings.ALGORITHM]
        )
        user_id: str = payload.get("sub")
        token_type: str = payload.get("type")
        
        if user_id is None or token_type != "refresh":
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="无效的刷新令牌"
            )
    except JWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="无效的刷新令牌"
        )
    
    # 验证用户仍然存在
    user_service = UserService(db)
    user = await user_service.get_by_id(int(user_id))
    if not user or user.status != "active":
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户不存在或已被禁用"
        )
    
    # 生成新的访问令牌
    access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": str(user.id), "role": user.role},
        expires_delta=access_token_expires
    )
    
    return TokenResponse(
        access_token=access_token,
        refresh_token=refresh_token,
        expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
    )

5.3 文章接口

# app/api/v1/posts.py
"""
文章相关接口
"""

from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List, Optional
from app.database import get_db
from app.schemas.post import (
    PostCreate, PostUpdate, PostDetail, PostBrief,
    PostListFilter, PostArchive
)
from app.schemas.common import PaginatedResponse, PaginationParams
from app.services.post_service import PostService
from app.models.user import User
from app.api.deps import get_current_active_user, require_author

router = APIRouter(prefix="/posts", tags=["文章"])


@router.get("", response_model=PaginatedResponse[PostBrief])
async def list_posts(
    category_id: Optional[int] = None,
    tag_id: Optional[int] = None,
    author_id: Optional[int] = None,
    keyword: Optional[str] = None,
    page: int = Query(1, ge=1),
    page_size: int = Query(20, ge=1, le=100),
    db: AsyncSession = Depends(get_db)
):
    """获取文章列表"""
    filters = PostListFilter(
        category_id=category_id,
        tag_id=tag_id,
        author_id=author_id,
        keyword=keyword
    )
    pagination = PaginationParams(page=page, page_size=page_size)
    
    post_service = PostService(db)
    result = await post_service.get_list(filters, pagination)
    return result


@router.get("/archives", response_model=List[PostArchive])
async def get_archives(
    db: AsyncSession = Depends(get_db)
):
    """获取文章归档"""
    post_service = PostService(db)
    return await post_service.get_archives()


@router.get("/{slug}", response_model=PostDetail)
async def get_post(
    slug: str,
    db: AsyncSession = Depends(get_db)
):
    """获取文章详情"""
    post_service = PostService(db)
    post = await post_service.get_by_slug(slug)
    
    if not post:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="文章不存在"
        )
    
    # 增加浏览量
    await post_service.increment_view_count(post.id)
    
    return post


@router.post("", response_model=PostDetail, status_code=status.HTTP_201_CREATED)
async def create_post(
    post_data: PostCreate,
    current_user: User = Depends(require_author),
    db: AsyncSession = Depends(get_db)
):
    """创建文章"""
    post_service = PostService(db)
    post = await post_service.create(post_data, current_user.id)
    return post


@router.put("/{post_id}", response_model=PostDetail)
async def update_post(
    post_id: int,
    post_data: PostUpdate,
    current_user: User = Depends(get_current_active_user),
    db: AsyncSession = Depends(get_db)
):
    """更新文章"""
    post_service = PostService(db)
    
    # 获取文章
    post = await post_service.get_by_id(post_id)
    if not post:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="文章不存在"
        )
    
    # 权限检查:只有作者、编辑或管理员可以修改
    if post.author_id != current_user.id and current_user.role not in ["admin", "editor"]:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="无权修改此文章"
        )
    
    updated_post = await post_service.update(post_id, post_data)
    return updated_post


@router.delete("/{post_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(
    post_id: int,
    current_user: User = Depends(get_current_active_user),
    db: AsyncSession = Depends(get_db)
):
    """删除文章(软删除)"""
    post_service = PostService(db)
    
    post = await post_service.get_by_id(post_id)
    if not post:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="文章不存在"
        )
    
    # 权限检查
    if post.author_id != current_user.id and current_user.role not in ["admin", "editor"]:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="无权删除此文章"
        )
    
    await post_service.delete(post_id)
    return None

5.4 服务层实现

# app/services/post_service.py
"""
文章服务层
"""

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, desc, and_
from sqlalchemy.orm import selectinload
from typing import Optional, List
from slugify import slugify
from datetime import datetime
from app.models.post import Post, PostStatus
from app.models.tag import Tag
from app.schemas.post import PostCreate, PostUpdate, PostListFilter
from app.schemas.common import PaginatedResponse, PaginationParams


class PostService:
    """文章服务"""
    
    def __init__(self, db: AsyncSession):
        self.db = db
    
    async def get_by_id(self, post_id: int) -> Optional[Post]:
        """根据 ID 获取文章"""
        result = await self.db.execute(
            select(Post)
            .where(Post.id == post_id, Post.is_deleted == False)
            .options(
                selectinload(Post.author),
                selectinload(Post.category),
                selectinload(Post.tags)
            )
        )
        return result.scalar_one_or_none()
    
    async def get_by_slug(self, slug: str) -> Optional[Post]:
        """根据 slug 获取文章"""
        result = await self.db.execute(
            select(Post)
            .where(
                Post.slug == slug,
                Post.is_deleted == False,
                Post.status == PostStatus.PUBLISHED
            )
            .options(
                selectinload(Post.author),
                selectinload(Post.category),
                selectinload(Post.tags),
                selectinload(Post.comments)
            )
        )
        return result.scalar_one_or_none()
    
    async def get_list(
        self,
        filters: PostListFilter,
        pagination: PaginationParams
    ) -> PaginatedResponse:
        """获取文章列表"""
        # 构建查询
        query = select(Post).where(
            Post.is_deleted == False,
            Post.status == filters.status or PostStatus.PUBLISHED
        )
        
        # 应用筛选条件
        if filters.category_id:
            query = query.where(Post.category_id == filters.category_id)
        
        if filters.author_id:
            query = query.where(Post.author_id == filters.author_id)
        
        if filters.keyword:
            search = f"%{filters.keyword}%"
            query = query.where(
                Post.title.ilike(search) | Post.summary.ilike(search)
            )
        
        # 标签筛选(需要关联查询)
        if filters.tag_id:
            query = query.join(Post.tags).where(Tag.id == filters.tag_id)
        
        # 计算总数
        count_query = select(func.count()).select_from(query.subquery())
        total_result = await self.db.execute(count_query)
        total = total_result.scalar()
        
        # 排序和分页
        query = (
            query
            .order_by(desc(Post.is_top), desc(Post.published_at), desc(Post.created_at))
            .offset(pagination.offset)
            .limit(pagination.page_size)
            .options(
                selectinload(Post.author),
                selectinload(Post.category)
            )
        )
        
        result = await self.db.execute(query)
        items = result.scalars().all()
        
        pages = (total + pagination.page_size - 1) // pagination.page_size
        
        return PaginatedResponse(
            items=list(items),
            total=total,
            page=pagination.page,
            page_size=pagination.page_size,
            pages=pages
        )
    
    async def create(self, post_data: PostCreate, author_id: int) -> Post:
        """创建文章"""
        # 生成 slug
        base_slug = slugify(post_data.title)
        slug = base_slug
        counter = 1
        
        # 确保 slug 唯一
        while await self.get_by_slug(slug):
            slug = f"{base_slug}-{counter}"
            counter += 1
        
        # 处理发布时间
        published_at = None
        if post_data.status == PostStatus.PUBLISHED:
            published_at = datetime.utcnow()
        
        # 创建文章
        post = Post(
            title=post_data.title,
            slug=slug,
            summary=post_data.summary,
            content=post_data.content,
            cover_image=post_data.cover_image,
            status=post_data.status,
            is_top=post_data.is_top,
            allow_comment=post_data.allow_comment,
            author_id=author_id,
            category_id=post_data.category_id,
            published_at=published_at
        )
        
        self.db.add(post)
        await self.db.flush()
        
        # 关联标签
        if post_data.tag_ids:
            tags_result = await self.db.execute(
                select(Tag).where(Tag.id.in_(post_data.tag_ids))
            )
            tags = tags_result.scalars().all()
            post.tags = tags
        
        await self.db.commit()
        await self.db.refresh(post)
        
        return post
    
    async def update(self, post_id: int, post_data: PostUpdate) -> Post:
        """更新文章"""
        post = await self.get_by_id(post_id)
        if not post:
            return None
        
        update_data = post_data.model_dump(exclude_unset=True)
        
        # 处理标签更新
        tag_ids = update_data.pop('tag_ids', None)
        if tag_ids is not None:
            tags_result = await self.db.execute(
                select(Tag).where(Tag.id.in_(tag_ids))
            )
            post.tags = tags_result.scalars().all()
        
        # 处理状态变更
        if 'status' in update_data:
            if update_data['status'] == PostStatus.PUBLISHED and not post.published_at:
                post.published_at = datetime.utcnow()
        
        # 更新其他字段
        for field, value in update_data.items():
            setattr(post, field, value)
        
        await self.db.commit()
        await self.db.refresh(post)
        
        return post
    
    async def delete(self, post_id: int) -> bool:
        """软删除文章"""
        post = await self.get_by_id(post_id)
        if not post:
            return False
        
        post.is_deleted = True
        await self.db.commit()
        return True
    
    async def increment_view_count(self, post_id: int) -> None:
        """增加浏览量"""
        await self.db.execute(
            select(Post).where(Post.id == post_id)
        )
        post = await self.get_by_id(post_id)
        if post:
            post.view_count += 1
            await self.db.commit()
    
    async def get_archives(self) -> List[dict]:
        """获取文章归档统计"""
        result = await self.db.execute(
            select(
                func.extract('year', Post.published_at).label('year'),
                func.extract('month', Post.published_at).label('month'),
                func.count().label('count')
            )
            .where(
                Post.status == PostStatus.PUBLISHED,
                Post.is_deleted == False
            )
            .group_by('year', 'month')
            .order_by(desc('year'), desc('month'))
        )
        
        archives = []
        for row in result.all():
            archives.append({
                "year": int(row.year),
                "month": int(row.month),
                "count": row.count
            })
        
        return archives

6. 核心业务功能

6.1 全文搜索服务

# app/services/search_service.py
"""
Elasticsearch 全文搜索服务
"""

from elasticsearch import AsyncElasticsearch
from typing import List, Optional
from app.config import settings


class SearchService:
    """搜索服务"""
    
    def __init__(self):
        self.client = AsyncElasticsearch([settings.ELASTICSEARCH_URL])
        self.index_name = "blog_posts"
    
    async def create_index(self):
        """创建索引"""
        mapping = {
            "mappings": {
                "properties": {
                    "title": {
                        "type": "text",
                        "analyzer": "standard",
                        "fields": {
                            "keyword": {"type": "keyword"}
                        }
                    },
                    "content": {
                        "type": "text",
                        "analyzer": "standard"
                    },
                    "summary": {"type": "text"},
                    "tags": {"type": "keyword"},
                    "category": {"type": "keyword"},
                    "author": {"type": "keyword"},
                    "published_at": {"type": "date"},
                    "status": {"type": "keyword"}
                }
            }
        }
        
        if not await self.client.indices.exists(index=self.index_name):
            await self.client.indices.create(index=self.index_name, body=mapping)
    
    async def index_post(self, post: dict):
        """索引文章"""
        await self.client.index(
            index=self.index_name,
            id=post["id"],
            body=post
        )
    
    async def search(
        self,
        query: str,
        page: int = 1,
        page_size: int = 20
    ) -> dict:
        """搜索文章"""
        search_body = {
            "from": (page - 1) * page_size,
            "size": page_size,
            "query": {
                "multi_match": {
                    "query": query,
                    "fields": ["title^3", "summary^2", "content", "tags"],
                    "type": "best_fields",
                    "fuzziness": "AUTO"
                }
            },
            "highlight": {
                "fields": {
                    "title": {},
                    "summary": {},
                    "content": {"fragment_size": 150, "number_of_fragments": 3}
                }
            },
            "sort": [
                {"_score": {"order": "desc"}},
                {"published_at": {"order": "desc"}}
            ]
        }
        
        response = await self.client.search(
            index=self.index_name,
            body=search_body
        )
        
        hits = response["hits"]["hits"]
        total = response["hits"]["total"]["value"]
        
        results = []
        for hit in hits:
            results.append({
                "id": hit["_id"],
                "score": hit["_score"],
                "source": hit["_source"],
                "highlight": hit.get("highlight", {})
            })
        
        return {
            "items": results,
            "total": total,
            "page": page,
            "page_size": page_size,
            "pages": (total + page_size - 1) // page_size
        }
    
    async def delete_post(self, post_id: int):
        """删除索引"""
        await self.client.delete(index=self.index_name, id=post_id)
    
    async def close(self):
        """关闭连接"""
        await self.client.close()

6.2 缓存装饰器

# app/cache.py
"""
缓存配置和装饰器
"""

import redis.asyncio as redis
import json
import hashlib
from functools import wraps
from typing import Optional, Callable, Any
from app.config import settings

# Redis 客户端
redis_client = redis.Redis.from_url(
    settings.REDIS_URL,
    encoding="utf-8",
    decode_responses=True,
    max_connections=settings.REDIS_POOL_SIZE
)


def generate_cache_key(prefix: str, *args, **kwargs) -> str:
    """生成缓存键"""
    key_data = f"{prefix}:{str(args)}:{str(kwargs)}"
    return hashlib.md5(key_data.encode()).hexdigest()


def cached(
    prefix: str,
    expire: int = 300,
    key_builder: Optional[Callable] = None
):
    """缓存装饰器"""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 生成缓存键
            if key_builder:
                cache_key = key_builder(*args, **kwargs)
            else:
                cache_key = generate_cache_key(prefix, *args, **kwargs)
            
            # 尝试从缓存获取
            try:
                cached_value = await redis_client.get(cache_key)
                if cached_value:
                    return json.loads(cached_value)
            except Exception:
                pass  # 缓存错误不影响主流程
            
            # 执行函数
            result = await func(*args, **kwargs)
            
            # 存入缓存
            try:
                await redis_client.setex(
                    cache_key,
                    expire,
                    json.dumps(result, default=str)
                )
            except Exception:
                pass
            
            return result
        return wrapper
    return decorator


def cache_delete_pattern(pattern: str):
    """删除匹配模式的缓存"""
    async def delete():
        keys = await redis_client.keys(pattern)
        if keys:
            await redis_client.delete(*keys)
    return delete


# 使用示例
class PostService:
    @cached("post", expire=600)
    async def get_by_slug(self, slug: str):
        # 从数据库获取
        return await self._fetch_from_db(slug)
    
    async def update(self, post_id: int, data: dict):
        # 更新数据库
        result = await self._update_db(post_id, data)
        # 清除相关缓存
        await cache_delete_pattern("post:*")
        return result

7. 前端集成

7.1 API 客户端

// frontend/src/api/client.ts
// API 客户端配置

import axios, { AxiosInstance, AxiosRequestConfig } from 'axios';

const API_BASE_URL = import.meta.env.VITE_API_URL || 'http://localhost:8000/api/v1';

class ApiClient {
  private client: AxiosInstance;

  constructor() {
    this.client = axios.create({
      baseURL: API_BASE_URL,
      timeout: 10000,
      headers: {
        'Content-Type': 'application/json',
      },
    });

    // 请求拦截器
    this.client.interceptors.request.use(
      (config) => {
        const token = localStorage.getItem('access_token');
        if (token) {
          config.headers.Authorization = `Bearer ${token}`;
        }
        return config;
      },
      (error) => Promise.reject(error)
    );

    // 响应拦截器
    this.client.interceptors.response.use(
      (response) => response.data,
      async (error) => {
        const originalRequest = error.config;

        // Token 过期,尝试刷新
        if (error.response?.status === 401 && !originalRequest._retry) {
          originalRequest._retry = true;
          try {
            const refreshToken = localStorage.getItem('refresh_token');
            const response = await this.client.post('/auth/refresh', {
              refresh_token: refreshToken,
            });
            
            localStorage.setItem('access_token', response.access_token);
            originalRequest.headers.Authorization = `Bearer ${response.access_token}`;
            
            return this.client(originalRequest);
          } catch (refreshError) {
            // 刷新失败,清除 token 并跳转登录
            localStorage.removeItem('access_token');
            localStorage.removeItem('refresh_token');
            window.location.href = '/login';
            return Promise.reject(refreshError);
          }
        }

        return Promise.reject(error);
      }
    );
  }

  async get<T>(url: string, config?: AxiosRequestConfig): Promise<T> {
    return this.client.get(url, config);
  }

  async post<T>(url: string, data?: any, config?: AxiosRequestConfig): Promise<T> {
    return this.client.post(url, data, config);
  }

  async put<T>(url: string, data?: any, config?: AxiosRequestConfig): Promise<T> {
    return this.client.put(url, data, config);
  }

  async delete<T>(url: string, config?: AxiosRequestConfig): Promise<T> {
    return this.client.delete(url, config);
  }
}

export const apiClient = new ApiClient();

7.2 文章列表组件

<!-- frontend/src/views/PostList.vue -->
<template>
  <div class="post-list">
    <!-- 筛选栏 -->
    <div class="filters">
      <el-input
        v-model="searchQuery"
        placeholder="搜索文章..."
        @keyup.enter="handleSearch"
      >
        <template #append>
          <el-button @click="handleSearch">
            <el-icon><Search /></el-icon>
          </el-button>
        </template>
      </el-input>
      
      <el-select v-model="selectedCategory" placeholder="选择分类" clearable>
        <el-option
          v-for="cat in categories"
          :key="cat.id"
          :label="cat.name"
          :value="cat.id"
        />
      </el-select>
    </div>

    <!-- 文章列表 -->
    <div class="posts">
      <el-skeleton :rows="5" animated v-if="loading" />
      
      <template v-else>
        <article
          v-for="post in posts"
          :key="post.id"
          class="post-card"
          @click="goToDetail(post.slug)"
        >
          <div class="post-cover" v-if="post.cover_image">
            <img :src="post.cover_image" :alt="post.title" />
          </div>
          
          <div class="post-content">
            <h2 class="post-title">{{ post.title }}</h2>
            <p class="post-summary">{{ post.summary }}</p>
            
            <div class="post-meta">
              <span class="author">
                <el-avatar :size="24" :src="post.author.avatar" />
                {{ post.author.nickname || post.author.username }}
              </span>
              <span class="date">{{ formatDate(post.created_at) }}</span>
              <span class="views">
                <el-icon><View /></el-icon>
                {{ post.view_count }}
              </span>
              <span class="comments">
                <el-icon><ChatDotRound /></el-icon>
                {{ post.comment_count }}
              </span>
            </div>
            
            <div class="post-tags" v-if="post.tags?.length">
              <el-tag
                v-for="tag in post.tags"
                :key="tag.id"
                size="small"
                effect="plain"
              >
                {{ tag.name }}
              </el-tag>
            </div>
          </div>
        </article>
      </template>
    </div>

    <!-- 分页 -->
    <el-pagination
      v-model:current-page="currentPage"
      v-model:page-size="pageSize"
      :total="total"
      :page-sizes="[10, 20, 50]"
      layout="total, sizes, prev, pager, next"
      @change="handlePageChange"
    />
  </div>
</template>

<script setup lang="ts">
import { ref, onMounted, watch } from 'vue';
import { useRouter } from 'vue-router';
import { Search, View, ChatDotRound } from '@element-plus/icons-vue';
import { ElMessage } from 'element-plus';
import { postApi } from '@/api/post';
import type { PostBrief, Category } from '@/types';

const router = useRouter();

// 状态
const posts = ref<PostBrief[]>([]);
const categories = ref<Category[]>([]);
const loading = ref(false);
const currentPage = ref(1);
const pageSize = ref(20);
const total = ref(0);
const searchQuery = ref('');
const selectedCategory = ref<number | null>(null);

// 获取文章列表
const fetchPosts = async () => {
  loading.value = true;
  try {
    const response = await postApi.getList({
      page: currentPage.value,
      page_size: pageSize.value,
      keyword: searchQuery.value || undefined,
      category_id: selectedCategory.value || undefined,
    });
    posts.value = response.items;
    total.value = response.total;
  } catch (error) {
    ElMessage.error('获取文章列表失败');
  } finally {
    loading.value = false;
  }
};

// 获取分类列表
const fetchCategories = async () => {
  try {
    categories.value = await postApi.getCategories();
  } catch (error) {
    console.error('获取分类失败', error);
  }
};

// 搜索
const handleSearch = () => {
  currentPage.value = 1;
  fetchPosts();
};

// 分页变化
const handlePageChange = () => {
  fetchPosts();
  window.scrollTo({ top: 0, behavior: 'smooth' });
};

// 跳转到详情
const goToDetail = (slug: string) => {
  router.push(`/post/${slug}`);
};

// 格式化日期
const formatDate = (date: string) => {
  return new Date(date).toLocaleDateString('zh-CN');
};

// 监听筛选条件变化
watch([selectedCategory], () => {
  currentPage.value = 1;
  fetchPosts();
});

onMounted(() => {
  fetchPosts();
  fetchCategories();
});
</script>

<style scoped>
.post-list {
  max-width: 1200px;
  margin: 0 auto;
  padding: 20px;
}

.filters {
  display: flex;
  gap: 16px;
  margin-bottom: 24px;
}

.post-card {
  display: flex;
  gap: 20px;
  padding: 20px;
  margin-bottom: 20px;
  background: #fff;
  border-radius: 8px;
  box-shadow: 0 2px 8px rgba(0, 0, 0, 0.1);
  cursor: pointer;
  transition: transform 0.2s, box-shadow 0.2s;
}

.post-card:hover {
  transform: translateY(-2px);
  box-shadow: 0 4px 16px rgba(0, 0, 0, 0.15);
}

.post-cover {
  flex-shrink: 0;
  width: 200px;
  height: 150px;
  border-radius: 4px;
  overflow: hidden;
}

.post-cover img {
  width: 100%;
  height: 100%;
  object-fit: cover;
}

.post-content {
  flex: 1;
  display: flex;
  flex-direction: column;
}

.post-title {
  margin: 0 0 12px;
  font-size: 20px;
  color: #333;
}

.post-summary {
  flex: 1;
  margin: 0 0 12px;
  color: #666;
  line-height: 1.6;
  display: -webkit-box;
  -webkit-line-clamp: 3;
  -webkit-box-orient: vertical;
  overflow: hidden;
}

.post-meta {
  display: flex;
  align-items: center;
  gap: 16px;
  color: #999;
  font-size: 14px;
}

.post-meta .author {
  display: flex;
  align-items: center;
  gap: 8px;
}

.post-tags {
  margin-top: 12px;
  display: flex;
  gap: 8px;
}
</style>

8. 部署配置

8.1 Docker Compose 生产配置

# docker-compose.prod.yml
version: '3.8'

services:
  app:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: blog-app
    restart: unless-stopped
    environment:
      - APP_ENV=production
      - DATABASE_URL=postgresql://postgres:${DB_PASSWORD}@db:5432/blog
      - REDIS_URL=redis://redis:6379/0
      - SECRET_KEY=${SECRET_KEY}
      - ELASTICSEARCH_URL=http://elasticsearch:9200
    depends_on:
      - db
      - redis
      - elasticsearch
    volumes:
      - ./static:/app/static
      - ./media:/app/media
    networks:
      - blog-network
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  db:
    image: postgres:16-alpine
    container_name: blog-db
    restart: unless-stopped
    environment:
      - POSTGRES_DB=blog
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=${DB_PASSWORD}
    volumes:
      - postgres_data:/var/lib/postgresql/data
    networks:
      - blog-network
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 10s
      timeout: 5s
      retries: 5

  redis:
    image: redis:7-alpine
    container_name: blog-redis
    restart: unless-stopped
    volumes:
      - redis_data:/data
    networks:
      - blog-network

  elasticsearch:
    image: elasticsearch:8.11.0
    container_name: blog-es
    restart: unless-stopped
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - es_data:/usr/share/elasticsearch/data
    networks:
      - blog-network

  nginx:
    image: nginx:alpine
    container_name: blog-nginx
    restart: unless-stopped
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx/nginx.conf:/etc/nginx/nginx.conf:ro
      - ./nginx/ssl:/etc/nginx/ssl:ro
      - ./static:/var/www/static:ro
      - ./media:/var/www/media:ro
    depends_on:
      - app
    networks:
      - blog-network

volumes:
  postgres_data:
  redis_data:
  es_data:

networks:
  blog-network:
    driver: bridge

8.2 Kubernetes 部署配置

# k8s/production/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: blog-app
  namespace: blog-production
spec:
  replicas: 3
  selector:
    matchLabels:
      app: blog-app
  template:
    metadata:
      labels:
        app: blog-app
    spec:
      containers:
        - name: app
          image: registry/blog-app:v1.0.0
          ports:
            - containerPort: 8000
          env:
            - name: APP_ENV
              value: "production"
            - name: DATABASE_URL
              valueFrom:
                secretKeyRef:
                  name: blog-secrets
                  key: database-url
            - name: SECRET_KEY
              valueFrom:
                secretKeyRef:
                  name: blog-secrets
                  key: secret-key
          resources:
            requests:
              memory: "512Mi"
              cpu: "500m"
            limits:
              memory: "1Gi"
              cpu: "1000m"
          livenessProbe:
            httpGet:
              path: /health
              port: 8000
            initialDelaySeconds: 30
            periodSeconds: 10
          readinessProbe:
            httpGet:
              path: /ready
              port: 8000
            initialDelaySeconds: 5
            periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: blog-app-service
  namespace: blog-production
spec:
  selector:
    app: blog-app
  ports:
    - port: 80
      targetPort: 8000
  type: ClusterIP
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: blog-ingress
  namespace: blog-production
  annotations:
    kubernetes.io/ingress.class: nginx
    cert-manager.io/cluster-issuer: letsencrypt-prod
    nginx.ingress.kubernetes.io/ssl-redirect: "true"
spec:
  tls:
    - hosts:
        - blog.example.com
      secretName: blog-tls
  rules:
    - host: blog.example.com
      http:
        paths:
          - path: /
            pathType: Prefix
            backend:
              service:
                name: blog-app-service
                port:
                  number: 80

避坑小贴士

1. 数据库连接池耗尽

# 问题:并发高时数据库连接不足
engine = create_async_engine(DATABASE_URL)  # 默认配置

# 正确:合理配置连接池
engine = create_async_engine(
    DATABASE_URL,
    pool_size=20,           # 基础连接数
    max_overflow=10,        # 额外连接数
    pool_pre_ping=True,     # 连接前检查
    pool_recycle=3600,      # 定期回收
    pool_timeout=30         # 获取连接超时
)

2. N+1 查询问题

# 问题:N+1 查询
posts = await db.execute(select(Post))
for post in posts:
    print(post.author.name)  # 每次循环都查询数据库!

# 正确:使用 selectinload 预加载
from sqlalchemy.orm import selectinload

result = await db.execute(
    select(Post)
    .options(selectinload(Post.author), selectinload(Post.tags))
)
posts = result.scalars().all()

3. 缓存穿透和雪崩

# 问题:缓存穿透(查询不存在的数据)
async def get_post(post_id: int):
    # 缓存中没有,数据库也没有,每次都要查数据库
    post = await cache.get(f"post:{post_id}")
    if not post:
        post = await db.get(Post, post_id)
        await cache.set(f"post:{post_id}", post)
    return post

# 正确:缓存空值 + 布隆过滤器
async def get_post(post_id: int):
    cache_key = f"post:{post_id}"
    
    # 检查布隆过滤器
    if not await bloom_filter.exists(cache_key):
        return None
    
    post = await cache.get(cache_key)
    if post == "__NULL__":
        return None
    if post:
        return post
    
    post = await db.get(Post, post_id)
    if post:
        await cache.set(cache_key, post, expire=3600)
    else:
        # 缓存空值,防止穿透
        await cache.set(cache_key, "__NULL__", expire=60)
    
    return post

4. 文件上传安全问题

# 问题:不安全的文件上传
@app.post("/upload")
async def upload(file: UploadFile):
    with open(f"/uploads/{file.filename}", "wb") as f:
        f.write(await file.read())  # 危险!可能覆盖系统文件

# 正确:安全处理上传文件
import uuid
import magic
from pathlib import Path

ALLOWED_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.gif', '.webp'}
MAX_FILE_SIZE = 5 * 1024 * 1024  # 5MB

@app.post("/upload")
async def upload(file: UploadFile):
    # 检查文件大小
    content = await file.read()
    if len(content) > MAX_FILE_SIZE:
        raise HTTPException(400, "文件过大")
    
    # 检查文件类型
    ext = Path(file.filename).suffix.lower()
    if ext not in ALLOWED_EXTENSIONS:
        raise HTTPException(400, "不支持的文件类型")
    
    # 使用 magic 检查真实文件类型
    mime = magic.from_buffer(content, mime=True)
    if not mime.startswith('image/'):
        raise HTTPException(400, "无效的图片文件")
    
    # 生成安全文件名
    filename = f"{uuid.uuid4()}{ext}"
    filepath = Path("/uploads") / filename[:2] / filename[2:4] / filename
    filepath.parent.mkdir(parents=True, exist_ok=True)
    
    with open(filepath, "wb") as f:
        f.write(content)
    
    return {"url": f"/media/{filename[:2]}/{filename[2:4]}/{filename}"}

5. 时区处理

# 问题:时区混乱
from datetime import datetime

created_at = datetime.now()  # 使用服务器本地时间,不可靠

# 正确:始终使用 UTC
from datetime import datetime, timezone

created_at = datetime.now(timezone.utc)  # UTC 时间

# 存储时保持 UTC
# 展示时转换为用户时区
from zoneinfo import ZoneInfo

def to_user_timezone(dt: datetime, user_tz: str = "Asia/Shanghai") -> datetime:
    return dt.astimezone(ZoneInfo(user_tz))

课后练习

练习 1:功能扩展

  1. 实现文章草稿自动保存功能
  2. 添加文章版本历史记录
  3. 实现文章导入导出(Markdown/PDF)
  4. 添加文章协作编辑功能

练习 2:性能优化

  1. 实现数据库查询优化(添加索引、优化慢查询)
  2. 配置 Redis 缓存策略
  3. 实现图片 CDN 加速
  4. 添加数据库读写分离

练习 3:安全加固

  1. 实现 API 速率限制
  2. 添加 XSS 和 CSRF 防护
  3. 配置 SQL 注入防护
  4. 实现敏感数据加密存储

练习 4:完整部署

  1. 编写 Dockerfile 和 docker-compose.yml
  2. 配置 CI/CD 流水线
  3. 部署到云服务器
  4. 配置监控和告警

总结

通过本讲的学习,我们完成了一个企业级博客系统的完整开发:

  1. 架构设计:从需求分析到技术选型,构建了清晰的系统架构
  2. 数据建模:设计了完整的数据库模型,包括用户、文章、分类、标签、评论等
  3. API 开发:使用 FastAPI 开发了 RESTful API,实现了完整的业务逻辑
  4. 前端集成:展示了前端 Vue.js 与后端 API 的集成
  5. 部署实践:提供了 Docker 和 Kubernetes 的部署配置

这个项目涵盖了现代 Web 开发的各个方面,是一个很好的综合实践案例。建议你在此基础上继续扩展功能,如添加更多社交功能、优化搜索体验、实现实时通知等。


参考资源

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐