根据项目结构创建项目

在这里插入图片描述
在这里插入图片描述

创建模块化路由

在这里插入图片描述
在这里插入图片描述在这里插入图片描述
routers/news.py 文件

from fastapi import APIRouter

# 创建 APIRouter 实例
router = APIRouter(prefix="/api/news", tags=["news"])

@router.get("/categories")
async def get_categories():
    return {"msg": "Categories fetched"}  

挂载路由

在main.py中挂在刚创建的news.py路由
在这里插入图片描述
main.py文件代码

from fastapi import FastAPI
from routers import news
app = FastAPI()
@app.get("/")
async def root():
    return {"message": "Hello World"}

# 注册路由
app.include_router(news.router)

项目中数据库建表和ORM配置安装

数据库建表

下载本项目建表SQL(自己的项目自己去写SQL建表文件不用下载)
点击下载本项目建表SQL
链接: 提取码: ylda

运行SQL建表

第一种方式在pyCharm中导入Sql文件建表,
链接数据库
在这里插入图片描述
在这里插入图片描述
运行SQL
在这里插入图片描述
第二种方式在Navicat中导入Sql文件建表,
因为我自己的SQL中写入了建news_app库的语句,所有不用手动在Navicat创建news_app库。直接在MYSQL中运行sqL文件就行
在这里插入图片描述
在这里插入图片描述
运行完后就创建成功了(让AI写自己项目中的SQL建表文件 。如果运行操作,一般是数据库版本的问题,让AI给你修改成你自己数据库版本的sql语句)
在这里插入图片描述

ORM配置安装

项目中安装ORM

加引号或者不加都行,如果不加引号安装报错就用加引号的

pip install sqlalchemy[asyncio] aiomysql
pip install 'sqlalchemy[asyncio]' aiomysql

我的项目一定安装过了
在这里插入图片描述
正确的安装成功
在这里插入图片描述

项目配置ORM

在这里插入图片描述
代码如下

ASYNC_DATABASE_URL为数据库url
root:630229 为数据库用户名和密码改成自己的
news_app:为数据库名称改成自己的


from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession, create_async_engine

# 数据库URL
ASYNC_DATABASE_URL = "mysql+aiomysql://root:630229@localhost:3306/news_app?charset=utf8mb4"

# 创建异步引擎
async_engine = create_async_engine(
    ASYNC_DATABASE_URL,
    echo=True,  # 可选:输出SQL日志
    pool_size=10,  # 设置连接池中保持的持久连接数
    max_overflow=20  # 设置连接池允许创建的额外连接数
)

# 创建异步会话
AsyncSessionLocal = async_sessionmaker(
    bind=async_engine,
    class_=AsyncSession,
    expire_on_commit=False
)

# 依赖项,用于获取数据库会话
async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()

完成以上操作基本环境和配置就完成了,。接下来咱们一起写第一个接口吧

接口实现流程

在这里插入图片描述
咱们写一个新闻分类接口先看接口文档
在这里插入图片描述

定义路由

在这里插入图片描述

routers/news.py文件

from fastapi import APIRouter

# 创建 APIRouter 实例
# prefix: 路由前缀(API接口规范文档)
# tags: 分组标签
router = APIRouter(prefix="/api/news", tags=["news"])

# 接口实现流程:
# 1. 模块化路由 API 接口规范文档
# 2. 定义模型类/数据库表(数据库设计文档)
# 3. 在 crud 文件夹里面创建文件,封装操作数据库的方法
# 4. 在路由处理函数里面调用 crud 封装好的方法,响应结果

@router.get("/categories")
async def get_categories(skip: int = 0, limit: int = 108):
   #先获取数据库里面新闻分类数据 >先定义模型类>封装查询数据的方法
    return {
        "code": 200,
        "message": "获取新闻分类成功",
        "data": "新闻分类列表"   # 此处应为实际分类数据,示例为字符串
    }

定义模型类(模型类必须参考当前数据库的表)一一对应

在这里插入图片描述
在modes/news.py中创建模型类。注意模型类必须参考数据库表一一对应上

在这里插入图片描述
根据表创建模型类
在这里插入图片描述
modes/news.py文件代码

#models层
#models:SQLAlchemy ORM 模型【映射 MySQL 真实表 news_category】,只定义表结构,不写查询逻辑

#为什么必须分层写成这样?
#解耦:改数据库字段只改 models,改返回字段只改 schemas,改 SQL 只改 crud;
#复用:crud 查询方法可被新增 / 编辑 / 删除接口多处调用;
#安全:schemas 过滤字段,不会把数据库所有字段(创建时间等)意外暴露前端;
#易维护:项目变大,按文件夹拆分,不会所有代码堆在一个 py 文件。

from datetime import  datetime

from sqlalchemy import DateTime, String, Integer
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
## 所有数据库表的父类,公共字段抽离:创建时间、更新时间
#Base抽取公共时间字段,所有数据表继承后自动带上创建 / 更新时间,不用重复写;
class Base(DeclarativeBase):
    # 每条数据新增自动填充当前时间
    created_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=datetime.now,
        comment="创建时间"
    )
    # 新增默认时间,数据修改时自动刷新为当前时间
    updated_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=datetime.now,
        onupdate=datetime.now,
        comment="更新时间"
    )
# 映射数据库真实表:news_category
#Category = 数据库 news_category 表在代码里的镜像,字段类型、约束和数据库一一对应;
class Category(Base):
    __tablename__ = "news_category" # 绑定数据库表名,必须和库中表名一致

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, comment="分类ID")
    name: Mapped[str] = mapped_column(String(50), nullable=False, comment="分类名称") # varchar(50)非空
    sort_order: Mapped[int] = mapped_column(Integer, default=0, comment="排序顺序") # 默认值0

#这个是ORM 数据库模型,只负责描述表结构,不能用来接收前端参数,前端参数交给 schemas。

分装数据库查询方法

crud文件夹
在这里插入图片描述

#crud:数据库操作层【封装所有 SQL 查询、增删改,路由不能直接操作数据库】
# CRUD 层设计意义
# 所有数据库查询统一放 crud,路由只调用函数:
# 后续改排序规则、改分页逻辑,只改 crud,不用动所有接口代码;
# 复用:新增 / 编辑分类接口也能调用本查询方法,避免重复写 SQL。

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from typing import List, Optional

from models.news import Category # 导入数据库映射模型
from schemas.news import CategoryCreate, CategoryUpdate

# 分页查询分类列表,入参:数据库会话db、跳过条数skip、每页条数limit

async def get_categories(db: AsyncSession, skip: int = 0, limit: int = 10) -> List[Category]:
    """获取分类列表(分页)"""
    # 1.拼装SQL语句:SELECT * FROM news_category ORDER BY sort_order LIMIT limit OFFSET skip
    # select(Category):等价SELECT * FROM news_category,ORM 语法生成查询 SQL
    # .order_by(Category.sort_order):ORDER BY sort_order,按排序字段升序排列
    # .offset(skip):OFFSET skip,跳过前面 skip 条数据(分页用:skip=(页码 - 1)* 每页条数)
    # .limit(limit):LIMIT limit,只查询 limit 条数据,控制每页数量

    stmt = select(Category).order_by(Category.sort_order).offset(skip).limit(limit)
    # 2.异步执行SQL,db.execute执行查询语句
    #   # await db.execute(stmt):异步执行 SQL,FastAPI 推荐异步数据库,并发性能更高
    result = await db.execute(stmt)
    # 3.scalars():剥离SQL元数据,只取出Category对象;all()转列表
    # .scalars().all():execute 返回的是封装结果集,scalars 提取 ORM 实体对象(Category 实例),all 转为列表
    return result.scalars().all()



async def get_categories_count(db: AsyncSession) -> int:
    """获取分类总数"""
    from sqlalchemy import func
    result = await db.execute(select(func.count(Category.id)))
    return result.scalar_one()

在routers/news.py中调用crud中分装的查询方法get_categories()

在这里插入图片描述
在这里插入图片描述

#routers:接口路由层【接收前端参数、依赖注入数据库、调用 crud、组装返回格式】
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List

from config.db_conf import get_db # 获取数据库会话的依赖函数
from crud import news as crud_news  # 导入crud数据库操作
from schemas.news import (
    CategoryCreate,
    CategoryUpdate,
    CategoryResponse
) # 导入Pydantic出参模型

# 创建APIRouter实例
#接口实现流程
#1.模块化路由 API接口规范文档
#2.定义模型类 数据库表(数据库设计文档)
# #3.在 crud 文件夹里面创建文件,封装操作数据库的方法
# #4.在路由处理函数里面调用 crud封装好的方法,响应结果

# 模块化路由,统一前缀、接口分组
# tags 分组标签
router = APIRouter(prefix="/api/news", tags=["news"])


# ==================== 分类接口 ====================
# 分页获取分类列表接口:GET /api/news/categories
@router.get("/categories", response_model=dict)
 ##  db: AsyncSession = Depends(get_db) 依赖注入:自动创建/回收数据库连接
async def get_categories(skip: int = 0,limit: int = 10,db: AsyncSession = Depends(get_db)):
    """获取新闻分类列表"""
    try:
        # 调用crud方法查数据库,拿到ORM列表:List[Category]
        categories = await crud_news.get_categories(db, skip=skip, limit=limit)
        # # 查询分类总条数,前端分页需要total
        total = await crud_news.get_categories_count(db)


        return {
            "code": 200,
            "message": "获取新闻分类成功",
            "data": {
                "total": total,
                 # 重点:ORM对象 → Pydantic模型序列化,过滤多余字段
                "items": [CategoryResponse.model_validate(c) for c in categories]
            }
        }
    except Exception as e:
        # 异常捕获,接口返回500错误信息
        raise HTTPException(status_code=500, detail=f"获取分类失败:{str(e)}")

测试接口成功
在这里插入图片描述
测试前端如果需要调用接口还需要配置跨域
在这里插入图片描述

from fastapi import FastAPI
from routers import news
from fastapi.middleware.cors import CORSMiddleware #配置跨域
app = FastAPI()

#跨域
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"], #允许的源
    allow_credentials=True, #允许携带cookie
    allow_methods=["*"], #允许的请求方法
    allow_headers=["*"], #允许的请求头
)


@app.get("/")
async def root():
    return {"message": "Hello World"}


# 注册路由
app.include_router(news.router)

测试的接口以完成,前端也可以调用成功,今天用FASTAPI写完一个接口,就到这里

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐