routers/news.py

#routers:接口路由层【接收前端参数、依赖注入数据库、调用 crud、组装返回格式】
from fastapi import APIRouter, Depends, HTTPException,Query
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from pydantic import BaseModel
from config.db_conf import get_db # 获取数据库会话的依赖函数
from crud import news as crud_news  # 导入crud数据库操作
from schemas.news import (
    CategoryCreate,
    CategoryUpdate,
    CategoryResponse
) # 导入Pydantic出参模型
# 获取分类下的文章列表接口:GET /api/news/list
@router.get("/list")
#alias起的别名
async def getNewsList(
        category_id: int =Query(..., alias="categoryId"), # 必填查询参数,前端用 categoryId
        page: int = 1,   # 页码,默认 1
        page_size: int = Query(10, alias="pageSize",le=100) , # 每页条数,默认 10,最大 100
        db: AsyncSession = Depends(get_db)  # 依赖注入数据库会话
):
    """获取新闻列表(分页)"""
    ##思路:处理分页规则查询新闻列表 >计算总量 >计算是否还有更多
    try:
        # 计算跳过记录数(偏移量)
        skip = (page - 1) * page_size
        # 调用 CRUD 获取当前页的新闻列表(返回 ORM 对象列表)
        news_list = await crud_news.get_news_list(db, category_id, skip, page_size)
        # 调用 CRUD 获取该分类下的新闻总条数
        total = await crud_news.get_news_count(db, category_id)
        # 判断是否还有下一页数据
        has_more = (skip + len(news_list)) < total

        return {
            "code": 200,
            "message": "获取新闻列表成功",
            "data": {
                "total": total,
                "has_more": has_more,
                "list": news_list
                # "list": [NewsResponse.model_validate(news) for news in news_list]
            }
        }
    except Exception as e:
        # 可以根据需要记录日志
        raise HTTPException(status_code=500, detail=f"获取新闻列表失败: {str(e)}")


# 文章详情接口:GET /api/news/detail
@router.get("/detail")
async def getNewsDetail(
        id: int = Query(..., alias="id"),
        db: AsyncSession = Depends(get_db)
):
    """获取新闻详情"""
    try:
        # 调用 CRUD 获取新闻详情
        news = await crud_news.get_news_detail(db, id)
        if not news:
            raise HTTPException(status_code=404, detail="新闻不存在")
        #调用 CRUD 获取新闻浏览量
        news.views = await crud_news.get_news_views(db, news.id)
        await db.refresh(news)
        # if not news.views:
        #     raise HTTPException(status_code=404, detail="获取新闻浏览量失败")
        #调用 CRUD 获取同类的相关推介新闻
        news.related_news = await crud_news.get_related_news(db, news.id, news.category_id)
        return {
            "code": 200,
            "message": "获取新闻详情成功",
            "data": news
        }
    except Exception as e:
        # 可以根据需要记录日志
        raise HTTPException(status_code=500, detail=f"获取新闻详情失败: {str(e)}")

crud/news.py

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select,func,update
from sqlalchemy.orm import selectinload
from typing import List, Optional

from models.news import Category,News # 导入数据库映射模型
from schemas.news import CategoryCreate, CategoryUpdate
#查询ID分类下的文章列表
async def get_news_list(db: AsyncSession, category_id: int, skip: int = 0, limit: int = 10) -> List[News]:
    """获取分类下的文章列表(分页)"""
    # # 构建查询语句:SELECT * FROM news WHERE category_id = :id OFFSET :skip LIMIT :limit
    stmt = select(News).where(News.category_id == category_id).offset(skip).limit(limit)
    #缺少 order_by,结果顺序不确定,建议加上 .order_by(News.publish_time.desc())
    result = await db.execute(stmt)
    # 提取所有标量(News 对象),返回列表
    return result.scalars().all ()  # 实际返回 List[News]

#当前ID下的新闻数量
async def get_news_count(db: AsyncSession, category_id: int) -> int:
    """获取分类下的文章数量"""
    #func.count()聚合计算
    # # 构建计数查询:SELECT COUNT(news.id) FROM news WHERE category_id = :id
    stmt = select(func.count(News.id)).where(News.category_id == category_id)
    result = await db.execute(stmt)
    return result.scalar_one() #  scalar_one()返回单个结果 scalar_one_or_none()返回单个结果或None

#查询文章详情
async def get_news_detail(db: AsyncSession, news_id: int):
    """获取文章详情"""
    stmt = select(News).where(News.id == news_id)
    result = await db.execute(stmt)
    return result.scalar_one_or_none()
# 查询文章浏览量
async def get_news_views(db: AsyncSession, news_id: int):
    """获取文章浏览量"""
    update_stmt = update(News).where(News.id == news_id).values(views=News.views + 1)
    await db.execute(update_stmt)
    # await db.commit()
    # #更新:检查数据库是否真的命中了数据 ,命中了返回True
    # return result.rowcount > 0


#获取同类的相关推介新闻
async def get_related_news(db: AsyncSession, news_id: int, category_id: int, limit: int = 10):
    """获取同类新闻"""
    #order_by 排序 浏览量和发布时间 ,News.views.desc()默认是升序
    stmt = select(News).where(News.category_id == category_id).where(News.id != news_id).order_by(News.views.desc(),News.publish_time.desc()).limit(limit)
    result = await db.execute(stmt)
    return result.scalars().all()



models/news.py

from datetime import  datetime

from sqlalchemy import DateTime, String, Integer,Text,ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.schema import Index
from typing import Optional
## 所有数据库表的父类,公共字段抽离:创建时间、更新时间
#Base抽取公共时间字段,所有数据表继承后自动带上创建 / 更新时间,不用重复写;
class Base(DeclarativeBase):
    # 每条数据新增自动填充当前时间
    created_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=datetime.now,
        comment="创建时间"
    )
    # 新增默认时间,数据修改时自动刷新为当前时间
    updated_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=datetime.now,
        onupdate=datetime.now,
        comment="更新时间"
    )


#定义数据表news的模型类
class News(Base):

    #创建索引:提高查询效率
    __table_args__ = (
        Index("fk_news_category_idx", "category_id"), #高频查询场景,外键列索引,加速关联查询
        Index("idx_publish_time", "publish_time"), #按发布时间查询,发布时间索引,加速排序/过滤
    )


    __tablename__ = "news"  # 数据库表名
    # 新闻 ID,主键,自增
    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, comment="新闻ID")
    # 新闻标题,非空,长度 255
    title: Mapped[str] = mapped_column(String(255), nullable=False, comment="新闻标题")
    # 新闻简介,可选,长度 500
    description: Mapped[Optional[str]] = mapped_column(String(500), comment="新闻简介")
    # 新闻内容,文本类型,非空
    content: Mapped[str] = mapped_column(Text, nullable=False, comment="新闻内容")
    # 新闻图片 URL,可选
    image: Mapped[Optional[str]] = mapped_column(String(255), comment="新闻图片")
    # 新闻作者,可选,长度 50
    author: Mapped[Optional[str]] = mapped_column(String(50), comment="新闻作者")
    # 新闻分类 ID,外键关联 news_category 表的 ID,非空
    category_id: Mapped[int] = mapped_column(Integer, ForeignKey("news_category.id"), nullable=False, comment="新闻分类ID")
    # 新闻浏览量,默认 0,非空
    views: Mapped[int] = mapped_column(Integer, default=0, nullable=False, comment="浏览量")
    # 发布时间,默认当前时间,非空
    publish_time: Mapped[datetime] = mapped_column(DateTime, default=datetime.now, comment="发布时间")

    def __repr__(self):
        return f"<News(id={self.id}, title={self.title}, views={self.views})>"

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐