【Python Web 开发精通】第25讲 | 企业级博客系统实战:从零构建完整项目
·
环境声明
- Python 版本:
Python 3.12+ - FastAPI 版本:
FastAPI 0.115+ - SQLAlchemy 版本:
SQLAlchemy 2.0+ - Pydantic 版本:
Pydantic 2.0+ - 数据库:
PostgreSQL 16+ - 缓存:
Redis 7+ - 前端:
Vue.js 3.x或React 18+ - 部署:
Docker 24+、Kubernetes 1.28+
学习目标
学完本讲,你将能够:
- 进行完整的需求分析和系统架构设计
- 设计合理的数据库模型和关系
- 开发 RESTful API 接口并实现业务逻辑
- 实现用户认证、权限管理和内容管理
- 集成前端界面并完成前后端联调
- 完成生产环境的容器化部署
1. 项目概述与架构设计
1.1 需求分析
功能需求:
| 模块 | 功能 | 优先级 |
|---|---|---|
| 用户系统 | 注册、登录、JWT认证、个人中心 | 高 |
| 文章管理 | 发布、编辑、删除、草稿箱 | 高 |
| 分类标签 | 文章分类、标签管理 | 高 |
| 评论系统 | 评论、回复、审核 | 中 |
| 搜索功能 | 全文搜索、筛选排序 | 中 |
| 文件管理 | 图片上传、资源管理 | 中 |
| 后台管理 | 数据统计、内容审核 | 中 |
| SEO优化 | 站点地图、Meta标签 | 低 |
非功能需求:
- 支持 1000+ 并发用户
- API 响应时间 < 200ms
- 系统可用性 99.9%
- 支持水平扩展
1.2 系统架构
┌─────────────────────────────────────────────────────────────────┐
│ 客户端层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ Web App │ │ Mobile Web │ │ Admin Panel │ │
│ │ (Vue.js) │ │ (Responsive)│ │ (Element Plus) │ │
│ └──────┬──────┘ └──────┬──────┘ └───────────┬─────────────┘ │
└─────────┼────────────────┼─────────────────────┼────────────────┘
│ │ │
└────────────────┴─────────────────────┘
│
┌──────────────────────────┼──────────────────────────────────────┐
│ CDN / Nginx (静态资源/负载均衡) │
└──────────────────────────┼──────────────────────────────────────┘
│
┌──────────────────────────┼──────────────────────────────────────┐
│ API Gateway (Kong/Nginx) │
│ 限流、认证、路由、日志 │
└──────────────────────────┼──────────────────────────────────────┘
│
┌──────────────────────────┼──────────────────────────────────────┐
│ 应用服务层 (FastAPI) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ 用户服务 │ │ 文章服务 │ │ 评论服务 │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
└──────────────────────────┼──────────────────────────────────────┘
│
┌──────────────────────────┼──────────────────────────────────────┐
│ 数据层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ PostgreSQL │ │ Redis │ │ Elasticsearch │ │
│ │ (主数据库) │ │ (缓存/会话) │ │ (全文搜索) │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
1.3 技术选型
| 层级 | 技术 | 说明 |
|---|---|---|
| 后端框架 | FastAPI | 高性能异步框架 |
| 数据库 ORM | SQLAlchemy 2.0 | 异步数据库操作 |
| 缓存 | Redis | 会话、缓存、限流 |
| 搜索 | Elasticsearch | 全文搜索 |
| 消息队列 | Redis/RabbitMQ | 异步任务 |
| 任务调度 | Celery | 定时任务、后台处理 |
| 对象存储 | MinIO/AWS S3 | 文件存储 |
| 监控 | Prometheus + Grafana | 指标监控 |
| 日志 | ELK Stack | 日志收集分析 |
2. 项目结构与初始化
2.1 目录结构
enterprise-blog/
├── app/
│ ├── __init__.py
│ ├── main.py # 应用入口
│ ├── config.py # 配置管理
│ ├── database.py # 数据库连接
│ ├── cache.py # 缓存配置
│ ├── models/ # 数据模型
│ │ ├── __init__.py
│ │ ├── user.py
│ │ ├── post.py
│ │ ├── category.py
│ │ ├── tag.py
│ │ └── comment.py
│ ├── schemas/ # Pydantic 模型
│ │ ├── __init__.py
│ │ ├── user.py
│ │ ├── post.py
│ │ └── common.py
│ ├── api/ # API 路由
│ │ ├── __init__.py
│ │ ├── deps.py # 依赖注入
│ │ ├── v1/
│ │ │ ├── __init__.py
│ │ │ ├── auth.py
│ │ │ ├── users.py
│ │ │ ├── posts.py
│ │ │ ├── categories.py
│ │ │ ├── tags.py
│ │ │ └── comments.py
│ ├── services/ # 业务逻辑层
│ │ ├── __init__.py
│ │ ├── user_service.py
│ │ ├── post_service.py
│ │ └── search_service.py
│ ├── core/ # 核心功能
│ │ ├── __init__.py
│ │ ├── security.py # 安全相关
│ │ ├── exceptions.py # 自定义异常
│ │ └── middleware.py # 中间件
│ └── utils/ # 工具函数
│ ├── __init__.py
│ ├── pagination.py
│ └── validators.py
├── alembic/ # 数据库迁移
├── tests/ # 测试代码
├── docker/ # Docker 配置
├── k8s/ # Kubernetes 配置
├── scripts/ # 脚本文件
├── requirements.txt
├── requirements-dev.txt
├── Dockerfile
├── docker-compose.yml
└── README.md
2.2 配置文件
# app/config.py
"""
应用配置管理
"""
from pydantic_settings import BaseSettings
from functools import lru_cache
from typing import List
class Settings(BaseSettings):
"""应用配置"""
# 应用信息
APP_NAME: str = "Enterprise Blog"
APP_VERSION: str = "1.0.0"
DEBUG: bool = False
# 安全配置
SECRET_KEY: str
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
ALGORITHM: str = "HS256"
# 数据库配置
DATABASE_URL: str = "postgresql+asyncpg://user:pass@localhost/blog"
DATABASE_POOL_SIZE: int = 20
DATABASE_MAX_OVERFLOW: int = 10
# Redis 配置
REDIS_URL: str = "redis://localhost:6379/0"
REDIS_POOL_SIZE: int = 50
# Elasticsearch 配置
ELASTICSEARCH_URL: str = "http://localhost:9200"
# 文件存储配置
MINIO_ENDPOINT: str = "localhost:9000"
MINIO_ACCESS_KEY: str = ""
MINIO_SECRET_KEY: str = ""
MINIO_BUCKET_NAME: str = "blog-media"
# 邮件配置
SMTP_HOST: str = ""
SMTP_PORT: int = 587
SMTP_USER: str = ""
SMTP_PASSWORD: str = ""
# CORS 配置
CORS_ORIGINS: List[str] = ["http://localhost:3000", "http://localhost:5173"]
# 分页配置
DEFAULT_PAGE_SIZE: int = 20
MAX_PAGE_SIZE: int = 100
# 缓存配置
CACHE_DEFAULT_TIMEOUT: int = 300 # 5分钟
class Config:
env_file = ".env"
case_sensitive = True
@lru_cache()
def get_settings() -> Settings:
"""获取配置实例(缓存)"""
return Settings()
settings = get_settings()
2.3 数据库配置
# app/database.py
"""
数据库连接配置
"""
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
from sqlalchemy.pool import NullPool
from app.config import settings
# 创建异步引擎
engine = create_async_engine(
settings.DATABASE_URL,
pool_size=settings.DATABASE_POOL_SIZE,
max_overflow=settings.DATABASE_MAX_OVERFLOW,
pool_pre_ping=True,
pool_recycle=3600,
echo=settings.DEBUG,
future=True
)
# 创建会话工厂
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False
)
# 声明基类
Base = declarative_base()
async def get_db():
"""获取数据库会话(依赖注入)"""
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
async def init_db():
"""初始化数据库(创建表)"""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
3. 数据模型设计
3.1 用户模型
# app/models/user.py
"""
用户模型
"""
from sqlalchemy import Column, Integer, String, Boolean, DateTime, Text, Enum
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.database import Base
import enum
class UserRole(str, enum.Enum):
"""用户角色"""
READER = "reader" # 读者
AUTHOR = "author" # 作者
EDITOR = "editor" # 编辑
ADMIN = "admin" # 管理员
class UserStatus(str, enum.Enum):
"""用户状态"""
ACTIVE = "active"
INACTIVE = "inactive"
SUSPENDED = "suspended"
class User(Base):
"""用户表"""
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True, nullable=False)
email = Column(String(100), unique=True, index=True, nullable=False)
hashed_password = Column(String(255), nullable=False)
# 个人信息
nickname = Column(String(50), nullable=True)
avatar = Column(String(255), nullable=True)
bio = Column(Text, nullable=True)
website = Column(String(255), nullable=True)
# 角色和状态
role = Column(Enum(UserRole), default=UserRole.READER, nullable=False)
status = Column(Enum(UserStatus), default=UserStatus.ACTIVE, nullable=False)
# 统计信息
post_count = Column(Integer, default=0)
comment_count = Column(Integer, default=0)
# 时间戳
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
last_login = Column(DateTime(timezone=True), nullable=True)
# 软删除
is_deleted = Column(Boolean, default=False)
deleted_at = Column(DateTime(timezone=True), nullable=True)
# 关系
posts = relationship("Post", back_populates="author", lazy="selectin")
comments = relationship("Comment", back_populates="author", lazy="selectin")
def __repr__(self):
return f"<User {self.username}>"
3.2 文章模型
# app/models/post.py
"""
文章模型
"""
from sqlalchemy import Column, Integer, String, Text, Boolean, DateTime, ForeignKey, Table
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.database import Base
import enum
class PostStatus(str, enum.Enum):
"""文章状态"""
DRAFT = "draft" # 草稿
PUBLISHED = "published" # 已发布
ARCHIVED = "archived" # 已归档
# 文章-标签关联表
post_tags = Table(
"post_tags",
Base.metadata,
Column("post_id", Integer, ForeignKey("posts.id", ondelete="CASCADE"), primary_key=True),
Column("tag_id", Integer, ForeignKey("tags.id", ondelete="CASCADE"), primary_key=True)
)
class Post(Base):
"""文章表"""
__tablename__ = "posts"
id = Column(Integer, primary_key=True, index=True)
# 基本信息
title = Column(String(200), nullable=False)
slug = Column(String(200), unique=True, index=True, nullable=False)
summary = Column(String(500), nullable=True)
content = Column(Text, nullable=False)
content_html = Column(Text, nullable=True) # 渲染后的 HTML
# 封面图
cover_image = Column(String(255), nullable=True)
# 状态和元数据
status = Column(String(20), default=PostStatus.DRAFT, nullable=False)
is_top = Column(Boolean, default=False) # 是否置顶
allow_comment = Column(Boolean, default=True)
# 统计信息
view_count = Column(Integer, default=0)
like_count = Column(Integer, default=0)
comment_count = Column(Integer, default=0)
# 外键
author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
category_id = Column(Integer, ForeignKey("categories.id"), nullable=True)
# 时间戳
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
published_at = Column(DateTime(timezone=True), nullable=True)
# 软删除
is_deleted = Column(Boolean, default=False)
# 关系
author = relationship("User", back_populates="posts")
category = relationship("Category", back_populates="posts")
tags = relationship("Tag", secondary=post_tags, back_populates="posts")
comments = relationship("Comment", back_populates="post", lazy="selectin")
def __repr__(self):
return f"<Post {self.title}>"
3.3 分类和标签模型
# app/models/category.py
"""
分类模型
"""
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.database import Base
class Category(Base):
"""分类表"""
__tablename__ = "categories"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(50), unique=True, nullable=False)
slug = Column(String(50), unique=True, index=True, nullable=False)
description = Column(String(200), nullable=True)
# 层级结构
parent_id = Column(Integer, ForeignKey("categories.id"), nullable=True)
sort_order = Column(Integer, default=0)
# 统计
post_count = Column(Integer, default=0)
created_at = Column(DateTime(timezone=True), server_default=func.now())
# 关系
posts = relationship("Post", back_populates="category")
children = relationship("Category", backref="parent", remote_side=[id])
def __repr__(self):
return f"<Category {self.name}>"
# app/models/tag.py
"""
标签模型
"""
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.database import Base
class Tag(Base):
"""标签表"""
__tablename__ = "tags"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(30), unique=True, nullable=False)
slug = Column(String(30), unique=True, index=True, nullable=False)
# 统计
post_count = Column(Integer, default=0)
created_at = Column(DateTime(timezone=True), server_default=func.now())
# 关系
posts = relationship("Post", secondary="post_tags", back_populates="tags")
def __repr__(self):
return f"<Tag {self.name}>"
3.4 评论模型
# app/models/comment.py
"""
评论模型
"""
from sqlalchemy import Column, Integer, String, Text, Boolean, DateTime, ForeignKey
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.database import Base
import enum
class CommentStatus(str, enum.Enum):
"""评论状态"""
PENDING = "pending" # 待审核
APPROVED = "approved" # 已通过
REJECTED = "rejected" # 已拒绝
class Comment(Base):
"""评论表"""
__tablename__ = "comments"
id = Column(Integer, primary_key=True, index=True)
# 内容
content = Column(Text, nullable=False)
content_html = Column(Text, nullable=True)
# 状态
status = Column(String(20), default=CommentStatus.APPROVED, nullable=False)
# 外键
post_id = Column(Integer, ForeignKey("posts.id", ondelete="CASCADE"), nullable=False)
author_id = Column(Integer, ForeignKey("users.id"), nullable=True) # 游客可为空
parent_id = Column(Integer, ForeignKey("comments.id", ondelete="CASCADE"), nullable=True)
# 游客信息(未登录用户)
guest_name = Column(String(50), nullable=True)
guest_email = Column(String(100), nullable=True)
guest_website = Column(String(255), nullable=True)
# 点赞数
like_count = Column(Integer, default=0)
# 时间戳
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# 关系
post = relationship("Post", back_populates="comments")
author = relationship("User", back_populates="comments")
parent = relationship("Comment", remote_side="Comment.id", backref="replies")
def __repr__(self):
return f"<Comment {self.id}>"
4. Pydantic 模型定义
4.1 基础模型
# app/schemas/common.py
"""
通用 Pydantic 模型
"""
from pydantic import BaseModel, ConfigDict
from typing import Generic, TypeVar, List, Optional
from datetime import datetime
T = TypeVar('T')
class ResponseBase(BaseModel):
"""基础响应模型"""
model_config = ConfigDict(from_attributes=True)
class PaginationParams(BaseModel):
"""分页参数"""
page: int = 1
page_size: int = 20
@property
def offset(self) -> int:
return (self.page - 1) * self.page_size
class PaginatedResponse(ResponseBase, Generic[T]):
"""分页响应模型"""
items: List[T]
total: int
page: int
page_size: int
pages: int
@property
def has_next(self) -> bool:
return self.page < self.pages
@property
def has_prev(self) -> bool:
return self.page > 1
class TimestampMixin(ResponseBase):
"""时间戳混入"""
created_at: datetime
updated_at: Optional[datetime] = None
4.2 用户模型
# app/schemas/user.py
"""
用户 Pydantic 模型
"""
from pydantic import BaseModel, EmailStr, Field, ConfigDict
from typing import Optional, List
from datetime import datetime
from app.models.user import UserRole, UserStatus
from app.schemas.common import ResponseBase, TimestampMixin
# ========== 请求模型 ==========
class UserCreate(BaseModel):
"""用户注册请求"""
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
password: str = Field(..., min_length=8, max_length=100)
nickname: Optional[str] = Field(None, max_length=50)
class UserLogin(BaseModel):
"""用户登录请求"""
username: str
password: str
class UserUpdate(BaseModel):
"""用户更新请求"""
nickname: Optional[str] = Field(None, max_length=50)
bio: Optional[str] = Field(None, max_length=500)
website: Optional[str] = Field(None, max_length=255)
avatar: Optional[str] = None
class PasswordChange(BaseModel):
"""密码修改请求"""
old_password: str
new_password: str = Field(..., min_length=8, max_length=100)
# ========== 响应模型 ==========
class UserBase(ResponseBase):
"""用户基础信息"""
id: int
username: str
nickname: Optional[str] = None
avatar: Optional[str] = None
class UserResponse(UserBase, TimestampMixin):
"""用户详细信息响应"""
email: EmailStr
bio: Optional[str] = None
website: Optional[str] = None
role: UserRole
post_count: int = 0
comment_count: int = 0
last_login: Optional[datetime] = None
class UserProfile(UserBase):
"""用户公开资料"""
bio: Optional[str] = None
website: Optional[str] = None
post_count: int = 0
created_at: datetime
class TokenResponse(BaseModel):
"""Token 响应"""
access_token: str
refresh_token: str
token_type: str = "bearer"
expires_in: int
4.3 文章模型
# app/schemas/post.py
"""
文章 Pydantic 模型
"""
from pydantic import BaseModel, Field, ConfigDict
from typing import Optional, List
from datetime import datetime
from app.models.post import PostStatus
from app.schemas.common import ResponseBase, TimestampMixin
from app.schemas.user import UserBase
# ========== 请求模型 ==========
class PostCreate(BaseModel):
"""创建文章请求"""
title: str = Field(..., min_length=1, max_length=200)
content: str = Field(..., min_length=1)
summary: Optional[str] = Field(None, max_length=500)
cover_image: Optional[str] = None
category_id: Optional[int] = None
tag_ids: List[int] = []
status: PostStatus = PostStatus.DRAFT
is_top: bool = False
allow_comment: bool = True
class PostUpdate(BaseModel):
"""更新文章请求"""
title: Optional[str] = Field(None, min_length=1, max_length=200)
content: Optional[str] = None
summary: Optional[str] = Field(None, max_length=500)
cover_image: Optional[str] = None
category_id: Optional[int] = None
tag_ids: Optional[List[int]] = None
status: Optional[PostStatus] = None
is_top: Optional[bool] = None
allow_comment: Optional[bool] = None
class PostListFilter(BaseModel):
"""文章列表筛选"""
category_id: Optional[int] = None
tag_id: Optional[int] = None
author_id: Optional[int] = None
status: Optional[PostStatus] = PostStatus.PUBLISHED
keyword: Optional[str] = None
# ========== 响应模型 ==========
class CategoryBrief(ResponseBase):
"""分类简要信息"""
id: int
name: str
slug: str
class TagBrief(ResponseBase):
"""标签简要信息"""
id: int
name: str
slug: str
class PostBrief(ResponseBase, TimestampMixin):
"""文章列表项"""
id: int
title: str
slug: str
summary: Optional[str] = None
cover_image: Optional[str] = None
status: PostStatus
is_top: bool
view_count: int
like_count: int
comment_count: int
author: UserBase
category: Optional[CategoryBrief] = None
class PostDetail(PostBrief):
"""文章详情"""
content: str
content_html: Optional[str] = None
allow_comment: bool
published_at: Optional[datetime] = None
tags: List[TagBrief] = []
class PostArchive(ResponseBase):
"""文章归档"""
year: int
month: int
count: int
5. API 接口开发
5.1 认证依赖
# app/api/deps.py
"""
API 依赖注入
"""
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from jose import JWTError, jwt
from app.database import get_db
from app.config import settings
from app.models.user import User, UserRole
from app.services.user_service import UserService
# 安全方案
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncSession = Depends(get_db)
) -> User:
"""获取当前登录用户"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(
credentials.credentials,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM]
)
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user_service = UserService(db)
user = await user_service.get_by_id(int(user_id))
if user is None or user.status != "active":
raise credentials_exception
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
"""获取当前活跃用户"""
if current_user.is_deleted:
raise HTTPException(status_code=400, detail="用户已被删除")
return current_user
class RoleChecker:
"""角色检查器"""
def __init__(self, allowed_roles: list):
self.allowed_roles = allowed_roles
def __call__(self, user: User = Depends(get_current_active_user)):
if user.role not in self.allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="权限不足"
)
return user
# 常用角色检查依赖
require_admin = RoleChecker([UserRole.ADMIN])
require_editor = RoleChecker([UserRole.ADMIN, UserRole.EDITOR])
require_author = RoleChecker([UserRole.ADMIN, UserRole.EDITOR, UserRole.AUTHOR])
5.2 认证接口
# app/api/v1/auth.py
"""
认证相关接口
"""
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import timedelta
from app.database import get_db
from app.config import settings
from app.schemas.user import UserCreate, UserResponse, TokenResponse, UserLogin
from app.services.user_service import UserService
from app.core.security import create_access_token, create_refresh_token, verify_password
router = APIRouter(prefix="/auth", tags=["认证"])
@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def register(
user_data: UserCreate,
db: AsyncSession = Depends(get_db)
):
"""用户注册"""
user_service = UserService(db)
# 检查用户名是否已存在
if await user_service.get_by_username(user_data.username):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="用户名已存在"
)
# 检查邮箱是否已存在
if await user_service.get_by_email(user_data.email):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="邮箱已被注册"
)
# 创建用户
user = await user_service.create(user_data)
return user
@router.post("/login", response_model=TokenResponse)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db)
):
"""用户登录"""
user_service = UserService(db)
# 验证用户
user = await user_service.get_by_username(form_data.username)
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
if user.status != "active":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="账户已被禁用"
)
# 更新最后登录时间
await user_service.update_last_login(user.id)
# 生成 Token
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": str(user.id), "role": user.role},
expires_delta=access_token_expires
)
refresh_token_expires = timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
refresh_token = create_refresh_token(
data={"sub": str(user.id)},
expires_delta=refresh_token_expires
)
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token,
expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
)
@router.post("/refresh", response_model=TokenResponse)
async def refresh_token(
refresh_token: str,
db: AsyncSession = Depends(get_db)
):
"""刷新访问令牌"""
from jose import jwt
try:
payload = jwt.decode(
refresh_token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM]
)
user_id: str = payload.get("sub")
token_type: str = payload.get("type")
if user_id is None or token_type != "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的刷新令牌"
)
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的刷新令牌"
)
# 验证用户仍然存在
user_service = UserService(db)
user = await user_service.get_by_id(int(user_id))
if not user or user.status != "active":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户不存在或已被禁用"
)
# 生成新的访问令牌
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": str(user.id), "role": user.role},
expires_delta=access_token_expires
)
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token,
expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
)
5.3 文章接口
# app/api/v1/posts.py
"""
文章相关接口
"""
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List, Optional
from app.database import get_db
from app.schemas.post import (
PostCreate, PostUpdate, PostDetail, PostBrief,
PostListFilter, PostArchive
)
from app.schemas.common import PaginatedResponse, PaginationParams
from app.services.post_service import PostService
from app.models.user import User
from app.api.deps import get_current_active_user, require_author
router = APIRouter(prefix="/posts", tags=["文章"])
@router.get("", response_model=PaginatedResponse[PostBrief])
async def list_posts(
category_id: Optional[int] = None,
tag_id: Optional[int] = None,
author_id: Optional[int] = None,
keyword: Optional[str] = None,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db)
):
"""获取文章列表"""
filters = PostListFilter(
category_id=category_id,
tag_id=tag_id,
author_id=author_id,
keyword=keyword
)
pagination = PaginationParams(page=page, page_size=page_size)
post_service = PostService(db)
result = await post_service.get_list(filters, pagination)
return result
@router.get("/archives", response_model=List[PostArchive])
async def get_archives(
db: AsyncSession = Depends(get_db)
):
"""获取文章归档"""
post_service = PostService(db)
return await post_service.get_archives()
@router.get("/{slug}", response_model=PostDetail)
async def get_post(
slug: str,
db: AsyncSession = Depends(get_db)
):
"""获取文章详情"""
post_service = PostService(db)
post = await post_service.get_by_slug(slug)
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="文章不存在"
)
# 增加浏览量
await post_service.increment_view_count(post.id)
return post
@router.post("", response_model=PostDetail, status_code=status.HTTP_201_CREATED)
async def create_post(
post_data: PostCreate,
current_user: User = Depends(require_author),
db: AsyncSession = Depends(get_db)
):
"""创建文章"""
post_service = PostService(db)
post = await post_service.create(post_data, current_user.id)
return post
@router.put("/{post_id}", response_model=PostDetail)
async def update_post(
post_id: int,
post_data: PostUpdate,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""更新文章"""
post_service = PostService(db)
# 获取文章
post = await post_service.get_by_id(post_id)
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="文章不存在"
)
# 权限检查:只有作者、编辑或管理员可以修改
if post.author_id != current_user.id and current_user.role not in ["admin", "editor"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="无权修改此文章"
)
updated_post = await post_service.update(post_id, post_data)
return updated_post
@router.delete("/{post_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(
post_id: int,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
"""删除文章(软删除)"""
post_service = PostService(db)
post = await post_service.get_by_id(post_id)
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="文章不存在"
)
# 权限检查
if post.author_id != current_user.id and current_user.role not in ["admin", "editor"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="无权删除此文章"
)
await post_service.delete(post_id)
return None
5.4 服务层实现
# app/services/post_service.py
"""
文章服务层
"""
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, desc, and_
from sqlalchemy.orm import selectinload
from typing import Optional, List
from slugify import slugify
from datetime import datetime
from app.models.post import Post, PostStatus
from app.models.tag import Tag
from app.schemas.post import PostCreate, PostUpdate, PostListFilter
from app.schemas.common import PaginatedResponse, PaginationParams
class PostService:
"""文章服务"""
def __init__(self, db: AsyncSession):
self.db = db
async def get_by_id(self, post_id: int) -> Optional[Post]:
"""根据 ID 获取文章"""
result = await self.db.execute(
select(Post)
.where(Post.id == post_id, Post.is_deleted == False)
.options(
selectinload(Post.author),
selectinload(Post.category),
selectinload(Post.tags)
)
)
return result.scalar_one_or_none()
async def get_by_slug(self, slug: str) -> Optional[Post]:
"""根据 slug 获取文章"""
result = await self.db.execute(
select(Post)
.where(
Post.slug == slug,
Post.is_deleted == False,
Post.status == PostStatus.PUBLISHED
)
.options(
selectinload(Post.author),
selectinload(Post.category),
selectinload(Post.tags),
selectinload(Post.comments)
)
)
return result.scalar_one_or_none()
async def get_list(
self,
filters: PostListFilter,
pagination: PaginationParams
) -> PaginatedResponse:
"""获取文章列表"""
# 构建查询
query = select(Post).where(
Post.is_deleted == False,
Post.status == filters.status or PostStatus.PUBLISHED
)
# 应用筛选条件
if filters.category_id:
query = query.where(Post.category_id == filters.category_id)
if filters.author_id:
query = query.where(Post.author_id == filters.author_id)
if filters.keyword:
search = f"%{filters.keyword}%"
query = query.where(
Post.title.ilike(search) | Post.summary.ilike(search)
)
# 标签筛选(需要关联查询)
if filters.tag_id:
query = query.join(Post.tags).where(Tag.id == filters.tag_id)
# 计算总数
count_query = select(func.count()).select_from(query.subquery())
total_result = await self.db.execute(count_query)
total = total_result.scalar()
# 排序和分页
query = (
query
.order_by(desc(Post.is_top), desc(Post.published_at), desc(Post.created_at))
.offset(pagination.offset)
.limit(pagination.page_size)
.options(
selectinload(Post.author),
selectinload(Post.category)
)
)
result = await self.db.execute(query)
items = result.scalars().all()
pages = (total + pagination.page_size - 1) // pagination.page_size
return PaginatedResponse(
items=list(items),
total=total,
page=pagination.page,
page_size=pagination.page_size,
pages=pages
)
async def create(self, post_data: PostCreate, author_id: int) -> Post:
"""创建文章"""
# 生成 slug
base_slug = slugify(post_data.title)
slug = base_slug
counter = 1
# 确保 slug 唯一
while await self.get_by_slug(slug):
slug = f"{base_slug}-{counter}"
counter += 1
# 处理发布时间
published_at = None
if post_data.status == PostStatus.PUBLISHED:
published_at = datetime.utcnow()
# 创建文章
post = Post(
title=post_data.title,
slug=slug,
summary=post_data.summary,
content=post_data.content,
cover_image=post_data.cover_image,
status=post_data.status,
is_top=post_data.is_top,
allow_comment=post_data.allow_comment,
author_id=author_id,
category_id=post_data.category_id,
published_at=published_at
)
self.db.add(post)
await self.db.flush()
# 关联标签
if post_data.tag_ids:
tags_result = await self.db.execute(
select(Tag).where(Tag.id.in_(post_data.tag_ids))
)
tags = tags_result.scalars().all()
post.tags = tags
await self.db.commit()
await self.db.refresh(post)
return post
async def update(self, post_id: int, post_data: PostUpdate) -> Post:
"""更新文章"""
post = await self.get_by_id(post_id)
if not post:
return None
update_data = post_data.model_dump(exclude_unset=True)
# 处理标签更新
tag_ids = update_data.pop('tag_ids', None)
if tag_ids is not None:
tags_result = await self.db.execute(
select(Tag).where(Tag.id.in_(tag_ids))
)
post.tags = tags_result.scalars().all()
# 处理状态变更
if 'status' in update_data:
if update_data['status'] == PostStatus.PUBLISHED and not post.published_at:
post.published_at = datetime.utcnow()
# 更新其他字段
for field, value in update_data.items():
setattr(post, field, value)
await self.db.commit()
await self.db.refresh(post)
return post
async def delete(self, post_id: int) -> bool:
"""软删除文章"""
post = await self.get_by_id(post_id)
if not post:
return False
post.is_deleted = True
await self.db.commit()
return True
async def increment_view_count(self, post_id: int) -> None:
"""增加浏览量"""
await self.db.execute(
select(Post).where(Post.id == post_id)
)
post = await self.get_by_id(post_id)
if post:
post.view_count += 1
await self.db.commit()
async def get_archives(self) -> List[dict]:
"""获取文章归档统计"""
result = await self.db.execute(
select(
func.extract('year', Post.published_at).label('year'),
func.extract('month', Post.published_at).label('month'),
func.count().label('count')
)
.where(
Post.status == PostStatus.PUBLISHED,
Post.is_deleted == False
)
.group_by('year', 'month')
.order_by(desc('year'), desc('month'))
)
archives = []
for row in result.all():
archives.append({
"year": int(row.year),
"month": int(row.month),
"count": row.count
})
return archives
6. 核心业务功能
6.1 全文搜索服务
# app/services/search_service.py
"""
Elasticsearch 全文搜索服务
"""
from elasticsearch import AsyncElasticsearch
from typing import List, Optional
from app.config import settings
class SearchService:
"""搜索服务"""
def __init__(self):
self.client = AsyncElasticsearch([settings.ELASTICSEARCH_URL])
self.index_name = "blog_posts"
async def create_index(self):
"""创建索引"""
mapping = {
"mappings": {
"properties": {
"title": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {"type": "keyword"}
}
},
"content": {
"type": "text",
"analyzer": "standard"
},
"summary": {"type": "text"},
"tags": {"type": "keyword"},
"category": {"type": "keyword"},
"author": {"type": "keyword"},
"published_at": {"type": "date"},
"status": {"type": "keyword"}
}
}
}
if not await self.client.indices.exists(index=self.index_name):
await self.client.indices.create(index=self.index_name, body=mapping)
async def index_post(self, post: dict):
"""索引文章"""
await self.client.index(
index=self.index_name,
id=post["id"],
body=post
)
async def search(
self,
query: str,
page: int = 1,
page_size: int = 20
) -> dict:
"""搜索文章"""
search_body = {
"from": (page - 1) * page_size,
"size": page_size,
"query": {
"multi_match": {
"query": query,
"fields": ["title^3", "summary^2", "content", "tags"],
"type": "best_fields",
"fuzziness": "AUTO"
}
},
"highlight": {
"fields": {
"title": {},
"summary": {},
"content": {"fragment_size": 150, "number_of_fragments": 3}
}
},
"sort": [
{"_score": {"order": "desc"}},
{"published_at": {"order": "desc"}}
]
}
response = await self.client.search(
index=self.index_name,
body=search_body
)
hits = response["hits"]["hits"]
total = response["hits"]["total"]["value"]
results = []
for hit in hits:
results.append({
"id": hit["_id"],
"score": hit["_score"],
"source": hit["_source"],
"highlight": hit.get("highlight", {})
})
return {
"items": results,
"total": total,
"page": page,
"page_size": page_size,
"pages": (total + page_size - 1) // page_size
}
async def delete_post(self, post_id: int):
"""删除索引"""
await self.client.delete(index=self.index_name, id=post_id)
async def close(self):
"""关闭连接"""
await self.client.close()
6.2 缓存装饰器
# app/cache.py
"""
缓存配置和装饰器
"""
import redis.asyncio as redis
import json
import hashlib
from functools import wraps
from typing import Optional, Callable, Any
from app.config import settings
# Redis 客户端
redis_client = redis.Redis.from_url(
settings.REDIS_URL,
encoding="utf-8",
decode_responses=True,
max_connections=settings.REDIS_POOL_SIZE
)
def generate_cache_key(prefix: str, *args, **kwargs) -> str:
"""生成缓存键"""
key_data = f"{prefix}:{str(args)}:{str(kwargs)}"
return hashlib.md5(key_data.encode()).hexdigest()
def cached(
prefix: str,
expire: int = 300,
key_builder: Optional[Callable] = None
):
"""缓存装饰器"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
# 生成缓存键
if key_builder:
cache_key = key_builder(*args, **kwargs)
else:
cache_key = generate_cache_key(prefix, *args, **kwargs)
# 尝试从缓存获取
try:
cached_value = await redis_client.get(cache_key)
if cached_value:
return json.loads(cached_value)
except Exception:
pass # 缓存错误不影响主流程
# 执行函数
result = await func(*args, **kwargs)
# 存入缓存
try:
await redis_client.setex(
cache_key,
expire,
json.dumps(result, default=str)
)
except Exception:
pass
return result
return wrapper
return decorator
def cache_delete_pattern(pattern: str):
"""删除匹配模式的缓存"""
async def delete():
keys = await redis_client.keys(pattern)
if keys:
await redis_client.delete(*keys)
return delete
# 使用示例
class PostService:
@cached("post", expire=600)
async def get_by_slug(self, slug: str):
# 从数据库获取
return await self._fetch_from_db(slug)
async def update(self, post_id: int, data: dict):
# 更新数据库
result = await self._update_db(post_id, data)
# 清除相关缓存
await cache_delete_pattern("post:*")
return result
7. 前端集成
7.1 API 客户端
// frontend/src/api/client.ts
// API 客户端配置
import axios, { AxiosInstance, AxiosRequestConfig } from 'axios';
const API_BASE_URL = import.meta.env.VITE_API_URL || 'http://localhost:8000/api/v1';
class ApiClient {
private client: AxiosInstance;
constructor() {
this.client = axios.create({
baseURL: API_BASE_URL,
timeout: 10000,
headers: {
'Content-Type': 'application/json',
},
});
// 请求拦截器
this.client.interceptors.request.use(
(config) => {
const token = localStorage.getItem('access_token');
if (token) {
config.headers.Authorization = `Bearer ${token}`;
}
return config;
},
(error) => Promise.reject(error)
);
// 响应拦截器
this.client.interceptors.response.use(
(response) => response.data,
async (error) => {
const originalRequest = error.config;
// Token 过期,尝试刷新
if (error.response?.status === 401 && !originalRequest._retry) {
originalRequest._retry = true;
try {
const refreshToken = localStorage.getItem('refresh_token');
const response = await this.client.post('/auth/refresh', {
refresh_token: refreshToken,
});
localStorage.setItem('access_token', response.access_token);
originalRequest.headers.Authorization = `Bearer ${response.access_token}`;
return this.client(originalRequest);
} catch (refreshError) {
// 刷新失败,清除 token 并跳转登录
localStorage.removeItem('access_token');
localStorage.removeItem('refresh_token');
window.location.href = '/login';
return Promise.reject(refreshError);
}
}
return Promise.reject(error);
}
);
}
async get<T>(url: string, config?: AxiosRequestConfig): Promise<T> {
return this.client.get(url, config);
}
async post<T>(url: string, data?: any, config?: AxiosRequestConfig): Promise<T> {
return this.client.post(url, data, config);
}
async put<T>(url: string, data?: any, config?: AxiosRequestConfig): Promise<T> {
return this.client.put(url, data, config);
}
async delete<T>(url: string, config?: AxiosRequestConfig): Promise<T> {
return this.client.delete(url, config);
}
}
export const apiClient = new ApiClient();
7.2 文章列表组件
<!-- frontend/src/views/PostList.vue -->
<template>
<div class="post-list">
<!-- 筛选栏 -->
<div class="filters">
<el-input
v-model="searchQuery"
placeholder="搜索文章..."
@keyup.enter="handleSearch"
>
<template #append>
<el-button @click="handleSearch">
<el-icon><Search /></el-icon>
</el-button>
</template>
</el-input>
<el-select v-model="selectedCategory" placeholder="选择分类" clearable>
<el-option
v-for="cat in categories"
:key="cat.id"
:label="cat.name"
:value="cat.id"
/>
</el-select>
</div>
<!-- 文章列表 -->
<div class="posts">
<el-skeleton :rows="5" animated v-if="loading" />
<template v-else>
<article
v-for="post in posts"
:key="post.id"
class="post-card"
@click="goToDetail(post.slug)"
>
<div class="post-cover" v-if="post.cover_image">
<img :src="post.cover_image" :alt="post.title" />
</div>
<div class="post-content">
<h2 class="post-title">{{ post.title }}</h2>
<p class="post-summary">{{ post.summary }}</p>
<div class="post-meta">
<span class="author">
<el-avatar :size="24" :src="post.author.avatar" />
{{ post.author.nickname || post.author.username }}
</span>
<span class="date">{{ formatDate(post.created_at) }}</span>
<span class="views">
<el-icon><View /></el-icon>
{{ post.view_count }}
</span>
<span class="comments">
<el-icon><ChatDotRound /></el-icon>
{{ post.comment_count }}
</span>
</div>
<div class="post-tags" v-if="post.tags?.length">
<el-tag
v-for="tag in post.tags"
:key="tag.id"
size="small"
effect="plain"
>
{{ tag.name }}
</el-tag>
</div>
</div>
</article>
</template>
</div>
<!-- 分页 -->
<el-pagination
v-model:current-page="currentPage"
v-model:page-size="pageSize"
:total="total"
:page-sizes="[10, 20, 50]"
layout="total, sizes, prev, pager, next"
@change="handlePageChange"
/>
</div>
</template>
<script setup lang="ts">
import { ref, onMounted, watch } from 'vue';
import { useRouter } from 'vue-router';
import { Search, View, ChatDotRound } from '@element-plus/icons-vue';
import { ElMessage } from 'element-plus';
import { postApi } from '@/api/post';
import type { PostBrief, Category } from '@/types';
const router = useRouter();
// 状态
const posts = ref<PostBrief[]>([]);
const categories = ref<Category[]>([]);
const loading = ref(false);
const currentPage = ref(1);
const pageSize = ref(20);
const total = ref(0);
const searchQuery = ref('');
const selectedCategory = ref<number | null>(null);
// 获取文章列表
const fetchPosts = async () => {
loading.value = true;
try {
const response = await postApi.getList({
page: currentPage.value,
page_size: pageSize.value,
keyword: searchQuery.value || undefined,
category_id: selectedCategory.value || undefined,
});
posts.value = response.items;
total.value = response.total;
} catch (error) {
ElMessage.error('获取文章列表失败');
} finally {
loading.value = false;
}
};
// 获取分类列表
const fetchCategories = async () => {
try {
categories.value = await postApi.getCategories();
} catch (error) {
console.error('获取分类失败', error);
}
};
// 搜索
const handleSearch = () => {
currentPage.value = 1;
fetchPosts();
};
// 分页变化
const handlePageChange = () => {
fetchPosts();
window.scrollTo({ top: 0, behavior: 'smooth' });
};
// 跳转到详情
const goToDetail = (slug: string) => {
router.push(`/post/${slug}`);
};
// 格式化日期
const formatDate = (date: string) => {
return new Date(date).toLocaleDateString('zh-CN');
};
// 监听筛选条件变化
watch([selectedCategory], () => {
currentPage.value = 1;
fetchPosts();
});
onMounted(() => {
fetchPosts();
fetchCategories();
});
</script>
<style scoped>
.post-list {
max-width: 1200px;
margin: 0 auto;
padding: 20px;
}
.filters {
display: flex;
gap: 16px;
margin-bottom: 24px;
}
.post-card {
display: flex;
gap: 20px;
padding: 20px;
margin-bottom: 20px;
background: #fff;
border-radius: 8px;
box-shadow: 0 2px 8px rgba(0, 0, 0, 0.1);
cursor: pointer;
transition: transform 0.2s, box-shadow 0.2s;
}
.post-card:hover {
transform: translateY(-2px);
box-shadow: 0 4px 16px rgba(0, 0, 0, 0.15);
}
.post-cover {
flex-shrink: 0;
width: 200px;
height: 150px;
border-radius: 4px;
overflow: hidden;
}
.post-cover img {
width: 100%;
height: 100%;
object-fit: cover;
}
.post-content {
flex: 1;
display: flex;
flex-direction: column;
}
.post-title {
margin: 0 0 12px;
font-size: 20px;
color: #333;
}
.post-summary {
flex: 1;
margin: 0 0 12px;
color: #666;
line-height: 1.6;
display: -webkit-box;
-webkit-line-clamp: 3;
-webkit-box-orient: vertical;
overflow: hidden;
}
.post-meta {
display: flex;
align-items: center;
gap: 16px;
color: #999;
font-size: 14px;
}
.post-meta .author {
display: flex;
align-items: center;
gap: 8px;
}
.post-tags {
margin-top: 12px;
display: flex;
gap: 8px;
}
</style>
8. 部署配置
8.1 Docker Compose 生产配置
# docker-compose.prod.yml
version: '3.8'
services:
app:
build:
context: .
dockerfile: Dockerfile
container_name: blog-app
restart: unless-stopped
environment:
- APP_ENV=production
- DATABASE_URL=postgresql://postgres:${DB_PASSWORD}@db:5432/blog
- REDIS_URL=redis://redis:6379/0
- SECRET_KEY=${SECRET_KEY}
- ELASTICSEARCH_URL=http://elasticsearch:9200
depends_on:
- db
- redis
- elasticsearch
volumes:
- ./static:/app/static
- ./media:/app/media
networks:
- blog-network
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
db:
image: postgres:16-alpine
container_name: blog-db
restart: unless-stopped
environment:
- POSTGRES_DB=blog
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=${DB_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
networks:
- blog-network
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 10s
timeout: 5s
retries: 5
redis:
image: redis:7-alpine
container_name: blog-redis
restart: unless-stopped
volumes:
- redis_data:/data
networks:
- blog-network
elasticsearch:
image: elasticsearch:8.11.0
container_name: blog-es
restart: unless-stopped
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- es_data:/usr/share/elasticsearch/data
networks:
- blog-network
nginx:
image: nginx:alpine
container_name: blog-nginx
restart: unless-stopped
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx/nginx.conf:/etc/nginx/nginx.conf:ro
- ./nginx/ssl:/etc/nginx/ssl:ro
- ./static:/var/www/static:ro
- ./media:/var/www/media:ro
depends_on:
- app
networks:
- blog-network
volumes:
postgres_data:
redis_data:
es_data:
networks:
blog-network:
driver: bridge
8.2 Kubernetes 部署配置
# k8s/production/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: blog-app
namespace: blog-production
spec:
replicas: 3
selector:
matchLabels:
app: blog-app
template:
metadata:
labels:
app: blog-app
spec:
containers:
- name: app
image: registry/blog-app:v1.0.0
ports:
- containerPort: 8000
env:
- name: APP_ENV
value: "production"
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: blog-secrets
key: database-url
- name: SECRET_KEY
valueFrom:
secretKeyRef:
name: blog-secrets
key: secret-key
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: blog-app-service
namespace: blog-production
spec:
selector:
app: blog-app
ports:
- port: 80
targetPort: 8000
type: ClusterIP
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: blog-ingress
namespace: blog-production
annotations:
kubernetes.io/ingress.class: nginx
cert-manager.io/cluster-issuer: letsencrypt-prod
nginx.ingress.kubernetes.io/ssl-redirect: "true"
spec:
tls:
- hosts:
- blog.example.com
secretName: blog-tls
rules:
- host: blog.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: blog-app-service
port:
number: 80
避坑小贴士
1. 数据库连接池耗尽
# 问题:并发高时数据库连接不足
engine = create_async_engine(DATABASE_URL) # 默认配置
# 正确:合理配置连接池
engine = create_async_engine(
DATABASE_URL,
pool_size=20, # 基础连接数
max_overflow=10, # 额外连接数
pool_pre_ping=True, # 连接前检查
pool_recycle=3600, # 定期回收
pool_timeout=30 # 获取连接超时
)
2. N+1 查询问题
# 问题:N+1 查询
posts = await db.execute(select(Post))
for post in posts:
print(post.author.name) # 每次循环都查询数据库!
# 正确:使用 selectinload 预加载
from sqlalchemy.orm import selectinload
result = await db.execute(
select(Post)
.options(selectinload(Post.author), selectinload(Post.tags))
)
posts = result.scalars().all()
3. 缓存穿透和雪崩
# 问题:缓存穿透(查询不存在的数据)
async def get_post(post_id: int):
# 缓存中没有,数据库也没有,每次都要查数据库
post = await cache.get(f"post:{post_id}")
if not post:
post = await db.get(Post, post_id)
await cache.set(f"post:{post_id}", post)
return post
# 正确:缓存空值 + 布隆过滤器
async def get_post(post_id: int):
cache_key = f"post:{post_id}"
# 检查布隆过滤器
if not await bloom_filter.exists(cache_key):
return None
post = await cache.get(cache_key)
if post == "__NULL__":
return None
if post:
return post
post = await db.get(Post, post_id)
if post:
await cache.set(cache_key, post, expire=3600)
else:
# 缓存空值,防止穿透
await cache.set(cache_key, "__NULL__", expire=60)
return post
4. 文件上传安全问题
# 问题:不安全的文件上传
@app.post("/upload")
async def upload(file: UploadFile):
with open(f"/uploads/{file.filename}", "wb") as f:
f.write(await file.read()) # 危险!可能覆盖系统文件
# 正确:安全处理上传文件
import uuid
import magic
from pathlib import Path
ALLOWED_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.gif', '.webp'}
MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB
@app.post("/upload")
async def upload(file: UploadFile):
# 检查文件大小
content = await file.read()
if len(content) > MAX_FILE_SIZE:
raise HTTPException(400, "文件过大")
# 检查文件类型
ext = Path(file.filename).suffix.lower()
if ext not in ALLOWED_EXTENSIONS:
raise HTTPException(400, "不支持的文件类型")
# 使用 magic 检查真实文件类型
mime = magic.from_buffer(content, mime=True)
if not mime.startswith('image/'):
raise HTTPException(400, "无效的图片文件")
# 生成安全文件名
filename = f"{uuid.uuid4()}{ext}"
filepath = Path("/uploads") / filename[:2] / filename[2:4] / filename
filepath.parent.mkdir(parents=True, exist_ok=True)
with open(filepath, "wb") as f:
f.write(content)
return {"url": f"/media/{filename[:2]}/{filename[2:4]}/{filename}"}
5. 时区处理
# 问题:时区混乱
from datetime import datetime
created_at = datetime.now() # 使用服务器本地时间,不可靠
# 正确:始终使用 UTC
from datetime import datetime, timezone
created_at = datetime.now(timezone.utc) # UTC 时间
# 存储时保持 UTC
# 展示时转换为用户时区
from zoneinfo import ZoneInfo
def to_user_timezone(dt: datetime, user_tz: str = "Asia/Shanghai") -> datetime:
return dt.astimezone(ZoneInfo(user_tz))
课后练习
练习 1:功能扩展
- 实现文章草稿自动保存功能
- 添加文章版本历史记录
- 实现文章导入导出(Markdown/PDF)
- 添加文章协作编辑功能
练习 2:性能优化
- 实现数据库查询优化(添加索引、优化慢查询)
- 配置 Redis 缓存策略
- 实现图片 CDN 加速
- 添加数据库读写分离
练习 3:安全加固
- 实现 API 速率限制
- 添加 XSS 和 CSRF 防护
- 配置 SQL 注入防护
- 实现敏感数据加密存储
练习 4:完整部署
- 编写 Dockerfile 和 docker-compose.yml
- 配置 CI/CD 流水线
- 部署到云服务器
- 配置监控和告警
总结
通过本讲的学习,我们完成了一个企业级博客系统的完整开发:
- 架构设计:从需求分析到技术选型,构建了清晰的系统架构
- 数据建模:设计了完整的数据库模型,包括用户、文章、分类、标签、评论等
- API 开发:使用 FastAPI 开发了 RESTful API,实现了完整的业务逻辑
- 前端集成:展示了前端 Vue.js 与后端 API 的集成
- 部署实践:提供了 Docker 和 Kubernetes 的部署配置
这个项目涵盖了现代 Web 开发的各个方面,是一个很好的综合实践案例。建议你在此基础上继续扩展功能,如添加更多社交功能、优化搜索体验、实现实时通知等。
参考资源
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)