用python (fastapi)做项目第二天实现新闻列表和新闻详情接口
·
routers/news.py
#routers:接口路由层【接收前端参数、依赖注入数据库、调用 crud、组装返回格式】
from fastapi import APIRouter, Depends, HTTPException,Query
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from pydantic import BaseModel
from config.db_conf import get_db # 获取数据库会话的依赖函数
from crud import news as crud_news # 导入crud数据库操作
from schemas.news import (
CategoryCreate,
CategoryUpdate,
CategoryResponse
) # 导入Pydantic出参模型
# 获取分类下的文章列表接口:GET /api/news/list
@router.get("/list")
#alias起的别名
async def getNewsList(
category_id: int =Query(..., alias="categoryId"), # 必填查询参数,前端用 categoryId
page: int = 1, # 页码,默认 1
page_size: int = Query(10, alias="pageSize",le=100) , # 每页条数,默认 10,最大 100
db: AsyncSession = Depends(get_db) # 依赖注入数据库会话
):
"""获取新闻列表(分页)"""
##思路:处理分页规则查询新闻列表 >计算总量 >计算是否还有更多
try:
# 计算跳过记录数(偏移量)
skip = (page - 1) * page_size
# 调用 CRUD 获取当前页的新闻列表(返回 ORM 对象列表)
news_list = await crud_news.get_news_list(db, category_id, skip, page_size)
# 调用 CRUD 获取该分类下的新闻总条数
total = await crud_news.get_news_count(db, category_id)
# 判断是否还有下一页数据
has_more = (skip + len(news_list)) < total
return {
"code": 200,
"message": "获取新闻列表成功",
"data": {
"total": total,
"has_more": has_more,
"list": news_list
# "list": [NewsResponse.model_validate(news) for news in news_list]
}
}
except Exception as e:
# 可以根据需要记录日志
raise HTTPException(status_code=500, detail=f"获取新闻列表失败: {str(e)}")
# 文章详情接口:GET /api/news/detail
@router.get("/detail")
async def getNewsDetail(
id: int = Query(..., alias="id"),
db: AsyncSession = Depends(get_db)
):
"""获取新闻详情"""
try:
# 调用 CRUD 获取新闻详情
news = await crud_news.get_news_detail(db, id)
if not news:
raise HTTPException(status_code=404, detail="新闻不存在")
#调用 CRUD 获取新闻浏览量
news.views = await crud_news.get_news_views(db, news.id)
await db.refresh(news)
# if not news.views:
# raise HTTPException(status_code=404, detail="获取新闻浏览量失败")
#调用 CRUD 获取同类的相关推介新闻
news.related_news = await crud_news.get_related_news(db, news.id, news.category_id)
return {
"code": 200,
"message": "获取新闻详情成功",
"data": news
}
except Exception as e:
# 可以根据需要记录日志
raise HTTPException(status_code=500, detail=f"获取新闻详情失败: {str(e)}")
crud/news.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select,func,update
from sqlalchemy.orm import selectinload
from typing import List, Optional
from models.news import Category,News # 导入数据库映射模型
from schemas.news import CategoryCreate, CategoryUpdate
#查询ID分类下的文章列表
async def get_news_list(db: AsyncSession, category_id: int, skip: int = 0, limit: int = 10) -> List[News]:
"""获取分类下的文章列表(分页)"""
# # 构建查询语句:SELECT * FROM news WHERE category_id = :id OFFSET :skip LIMIT :limit
stmt = select(News).where(News.category_id == category_id).offset(skip).limit(limit)
#缺少 order_by,结果顺序不确定,建议加上 .order_by(News.publish_time.desc())
result = await db.execute(stmt)
# 提取所有标量(News 对象),返回列表
return result.scalars().all () # 实际返回 List[News]
#当前ID下的新闻数量
async def get_news_count(db: AsyncSession, category_id: int) -> int:
"""获取分类下的文章数量"""
#func.count()聚合计算
# # 构建计数查询:SELECT COUNT(news.id) FROM news WHERE category_id = :id
stmt = select(func.count(News.id)).where(News.category_id == category_id)
result = await db.execute(stmt)
return result.scalar_one() # scalar_one()返回单个结果 scalar_one_or_none()返回单个结果或None
#查询文章详情
async def get_news_detail(db: AsyncSession, news_id: int):
"""获取文章详情"""
stmt = select(News).where(News.id == news_id)
result = await db.execute(stmt)
return result.scalar_one_or_none()
# 查询文章浏览量
async def get_news_views(db: AsyncSession, news_id: int):
"""获取文章浏览量"""
update_stmt = update(News).where(News.id == news_id).values(views=News.views + 1)
await db.execute(update_stmt)
# await db.commit()
# #更新:检查数据库是否真的命中了数据 ,命中了返回True
# return result.rowcount > 0
#获取同类的相关推介新闻
async def get_related_news(db: AsyncSession, news_id: int, category_id: int, limit: int = 10):
"""获取同类新闻"""
#order_by 排序 浏览量和发布时间 ,News.views.desc()默认是升序
stmt = select(News).where(News.category_id == category_id).where(News.id != news_id).order_by(News.views.desc(),News.publish_time.desc()).limit(limit)
result = await db.execute(stmt)
return result.scalars().all()
models/news.py
from datetime import datetime
from sqlalchemy import DateTime, String, Integer,Text,ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.schema import Index
from typing import Optional
## 所有数据库表的父类,公共字段抽离:创建时间、更新时间
#Base抽取公共时间字段,所有数据表继承后自动带上创建 / 更新时间,不用重复写;
class Base(DeclarativeBase):
# 每条数据新增自动填充当前时间
created_at: Mapped[datetime] = mapped_column(
DateTime,
default=datetime.now,
comment="创建时间"
)
# 新增默认时间,数据修改时自动刷新为当前时间
updated_at: Mapped[datetime] = mapped_column(
DateTime,
default=datetime.now,
onupdate=datetime.now,
comment="更新时间"
)
#定义数据表news的模型类
class News(Base):
#创建索引:提高查询效率
__table_args__ = (
Index("fk_news_category_idx", "category_id"), #高频查询场景,外键列索引,加速关联查询
Index("idx_publish_time", "publish_time"), #按发布时间查询,发布时间索引,加速排序/过滤
)
__tablename__ = "news" # 数据库表名
# 新闻 ID,主键,自增
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, comment="新闻ID")
# 新闻标题,非空,长度 255
title: Mapped[str] = mapped_column(String(255), nullable=False, comment="新闻标题")
# 新闻简介,可选,长度 500
description: Mapped[Optional[str]] = mapped_column(String(500), comment="新闻简介")
# 新闻内容,文本类型,非空
content: Mapped[str] = mapped_column(Text, nullable=False, comment="新闻内容")
# 新闻图片 URL,可选
image: Mapped[Optional[str]] = mapped_column(String(255), comment="新闻图片")
# 新闻作者,可选,长度 50
author: Mapped[Optional[str]] = mapped_column(String(50), comment="新闻作者")
# 新闻分类 ID,外键关联 news_category 表的 ID,非空
category_id: Mapped[int] = mapped_column(Integer, ForeignKey("news_category.id"), nullable=False, comment="新闻分类ID")
# 新闻浏览量,默认 0,非空
views: Mapped[int] = mapped_column(Integer, default=0, nullable=False, comment="浏览量")
# 发布时间,默认当前时间,非空
publish_time: Mapped[datetime] = mapped_column(DateTime, default=datetime.now, comment="发布时间")
def __repr__(self):
return f"<News(id={self.id}, title={self.title}, views={self.views})>"
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)