目录

1.引言

2.最基本的crud

(1)C — Create(新增 / 插入数据)

直接后端插入数据

通过输入插入数据

核心知识点

(2)D — Delete(删除)

【不用 await】

【必须 await】

(3)U — Update(修改)

核心知识点

(4)R — Retrieve(查询)

整体查询

条件查询

模糊查询

分页查询

核心知识点

统计查询

核心知识点

总结


1.引言

1.Python中ORM基础启动连接步骤中我们讲完了orm基础连接,现在我们完成连接后,我们要进行对数据库的增加删改查(也称为orm的crud),本次我们只讲单表方面的crud代码。上一篇文章我们在最后加入了路由,所以我们需要另外创建一个新文件在新文件下面新建python文件来编写路由代码进行增删改查。

现在项目文件长这样

2.最基本的crud

(1)C — Create(新增 / 插入数据)

增加插入数据分为两种情况:

  • 直接后端插入数据

@orm_router.get("/add")
async def add_user(session: AsyncSession = Depends(get_session)):
    user = User(name='xiaoyang',age='21',gender='男')
    session.add(user)
    return user
  • 通过输入插入数据

@orm_router.get("/add2")
async def add_user(name1: str, age1: int, gender1: str, session: AsyncSession = Depends(get_session)):
    user = User(name=name1,age=age1,gender=gender1)
    session.add(user)
    return {name1: user.name, age1: user.age, gender1: user.gender}
#id自增不用加,c-t和u_t写在基础模型里面,每次建表自动写入
核心知识点
  • id 自增,不用传

  • c_t、u_t 自动生成时间,不用传

  • db.add() 只是标记,commit 才真正入库(以为我在main文件里面写入了依赖系统所以不用再commit真正入库)

  • 非空字段必须传值

(2)D — Delete(删除)

@orm_router.post("/drop")
async def drop_user(id: int, session: AsyncSession = Depends(get_session)):
    user = await session.get(User,id)
    if user:
        await session.delete(user)
    return {"message": "用户删除成功"}

需要特别注意的是删除必须先查(session.get(User,id))再删除(session.delete(user))

在这里我遇到的问题是是否加入await:

增改只标记,不用 await; 查询删除提交,必须 await!操作内存,不碰数据库不用上await

【不用 await】

  • session.add(user)

  • 创建对象

  • 修改对象属性

  • 只是标记操作

【必须 await】

  • await session.get(...)

  • await session.execute(...)

  • await session.delete(...)

  • await session.flush()

  • await session.commit()

(3)U — Update(修改)

修改和删除一样必须先查再改,同时别忘了查时加入await

@orm_router.post("/update")
async def update_user(id: int, name1: str, age1: int, gender1: str, session: AsyncSession = Depends(get_session)):
    user = await session.get(User,id)
    if user:
        user.name=name1
        user.age=age1
        user.gender=gender1
    else:
        return {"message": "用户不存在"}
    return {"message": "用户更新成功"}
核心知识点
  • ORM 会自动追踪变化

  • 只要改对象属性,ORM 就知道要更新

  • u_t 更新时间自动刷新,不用手动改

  • 必须先查,否则无法修改

(4)R — Retrieve(查询)

整体查询

#全员查询
@orm_router.post("/retrieve2")
async def retrieve( session: AsyncSession = Depends(get_session)):
    stmt= select(User)
    result=await session.execute(stmt)
    stu_list = result.scalars().all()
    return stu_list

条件查询

# 1. 构建查询:查User表,满足条件
stmt = select(User).where(条件)
# 2. 执行查询
result = await session.execute(stmt)
# 3. 取出结果
data = result.scalars().all()
***********************************************************************
#条件查询
@orm_router.post("/retrieve1")
async def retrieve(id: int, session: AsyncSession = Depends(get_session)):
    stmt=select(User).where(User.id==id)
    result=await session.execute(stmt)
    stu_list = result.scalars().all()
    return stu_list

模糊查询

代码模板

# 查询名字包含 关键词
stmt = select(User).where(User.name.like(f"%{关键词}%"))
result = await session.execute(stmt)
data = result.scalars().all()
**********************************************************************
@orm_router.post("/retrieve3")
async def retrieve(name2: str, session: AsyncSession = Depends(get_session)):
    user=select(User).where(User.name.like(f'%{name2}%'))
    result=await session.execute(user)
    stu_list=result.scalars().all()
    return stu_list
#查询带{name2}文字的信息

分页查询

核心知识点
  1. 公式:

  • 偏移量 offset = (页码 - 1) * 每页条数
  • limit:每页多少条
  1. 顺序:select().offset(偏移).limit(条数)

@orm_router.post("/page")
async def page(page: int, size: int, session: AsyncSession = Depends(get_session)):
    stmt=select(User).offset((page-1)*size).limit(size)
    result=await session.execute(stmt)
    stu_list = result.scalars().all()
    return stu_list

统计查询

核心知识点
  1. 必须导入 from sqlalchemy import func

  2. func.count(字段):统计条数

  3. func.avg(字段):平均值

  4. func.max/min:最大最小

  5. 结果用 .scalar() 取单个数值,不用 .all()

@orm_router.post("/tngji")
async def tngji(session: AsyncSession = Depends(get_session)):
    age1=select(func.avg(User.age))
    result=await session.execute(age1)
    stu_list=result.scalars().all()
    return stu_list

总结

这些看完我们的Orm单表CRUD基础代码大部分代码就已经概况完了

下面是我们的完整项目代码:

from fastapi import APIRouter, Depends
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio.session import AsyncSession
from ferisi2.orm自己小任务.main import get_session, User


orm_router=APIRouter()
@orm_router.get("/")
async def root():
    return {"message": "Hello World"}
@orm_router.get("/add1")
async def add_user(session: AsyncSession = Depends(get_session)):
    user = User(name='xiaoyang',age='21',gender='男')
    session.add(user)
    return user
@orm_router.get("/add2")
async def add_user(name1: str, age1: int, gender1: str, session: AsyncSession = Depends(get_session)):
    user = User(name=name1,age=age1,gender=gender1)
    session.add(user)
    return {name1: user.name, age1: user.age, gender1: user.gender}
@orm_router.post("/drop")
async def drop_user(id: int, session: AsyncSession = Depends(get_session)):
    user = await session.get(User,id)
    if user:
        await session.delete(user)
    else:
        return {"message": "用户不存在"}
    return {"message": "用户删除成功"}
@orm_router.post("/update")
async def update_user(id: int, name1: str, age1: int, gender1: str, session: AsyncSession = Depends(get_session)):
    user = await session.get(User,id)
    if user:
        user.name=name1
        user.age=age1
        user.gender=gender1
    else:
        return {"message": "用户不存在"}
    return {"message": "用户更新成功"}
@orm_router.post("/retrieve1")
async def retrieve(id: int, session: AsyncSession = Depends(get_session)):
    stmt=select(User).where(User.id==id)
    result=await session.execute(stmt)
    stu_list = result.scalars().all()
    return stu_list
@orm_router.post("/retrieve2")
async def retrieve( session: AsyncSession = Depends(get_session)):
    stmt= select(User)
    result=await session.execute(stmt)
    stu_list = result.scalars().all()
    return stu_list
@orm_router.post("/retrieve3")
async def retrieve(name2: str, session: AsyncSession = Depends(get_session)):
    user=select(User).where(User.name.like(f'%{name2}%'))
    result=await session.execute(user)
    stu_list=result.scalars().all()
    return stu_list
@orm_router.post("/page")
async def page(page: int, size: int, session: AsyncSession = Depends(get_session)):
    stmt=select(User).offset((page-1)*size).limit(size)
    result=await session.execute(stmt)
    stu_list = result.scalars().all()
    return stu_list
@orm_router.post("/tngji")
async def tngji(session: AsyncSession = Depends(get_session)):
    age1=select(func.avg(User.age))
    result=await session.execute(age1)
    stu_list=result.scalars().all()
    return stu_list

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐