一、介绍

1.什么是 SQLAlchemy?

SQLAlchemy 是 Python 的 SQL 工具包和对象关系映射器,是 Python 中最流行的 ORM(对象关系映射)工具,它为应用程序开发人员提供了 SQL 的全部功能和灵活性。它提供了一整套众所周知的企业级持久化模式,旨在实现高效且高性能的数据库访问,并已将其改编为简洁且符合 Python 风格的领域语言。

官方主站: https://www.sqlalchemy.org/

官方文档: https://docs.sqlalchemy.org/en/stable/

官网下载页:https://www.sqlalchemy.org/download.html

GitHub 源码库:https://github.com/sqlalchemy/sqlalchemy(可提 Issue、看源码、贡献代码)

官方 Discourse 论坛:https://discuss.sqlalchemy.org/(提问、交流问题的官方社区)

2.核心优势

  • 异步非阻塞:适配 FastAPI、Starlette 等异步 Web 框架,提升高并发场景性能;
  • API 统一:异步 API 与同步 API 逻辑一致,学习成本低;
  • 跨数据库兼容:支持 SQLite、MySQL、PostgreSQL、Oracle 等主流数据库,切换数据库只需修改连接字符串。
  • 灵活易用:既支持高层的 ORM 操作,也支持底层的原生 SQL 执行。
  • 强大的查询能力:提供丰富的查询 API,支持复杂的筛选、聚合、关联查询。
  • 事务支持:完善的事务管理机制,保证数据操作的原子性。

3.核心组件

  • Engine:数据库连接引擎,负责管理数据库连接池。
  • Session:数据库会话,用于执行 CRUD 操作。
  • Declarative Base:模型基类,所有数据模型都继承该类。
  • Mapper:将 Python 类映射到数据库表。
  • Query:查询对象,用于构建数据库查询。

4.相关单词

单词 音标 中文释义 核心使用场景
SQLAlchemy /ˌeskjuːˈel ˈælkəmi/ SQL 炼金术(ORM 框架名) 框架整体引用、技术栈说明
Session /ˈseʃn/ 会话 数据库连接会话管理(2.0 核心对象)
Model /ˈmɒdl/(英)/ˈmɑːdl/(美) 模型 定义数据库表映射的类
Engine /ˈendʒɪn/ 引擎 数据库连接引擎创建
Query /ˈkwɪəri/(英)/ˈkwɪri/(美) 查询 构建数据库查询语句
Column /ˈkɒləm/(英)/ˈkɑːləm/(美) 定义数据库表字段
Table /ˈteɪbl/ 数据库表对象 / 映射
Relationship /rɪˈleɪʃnʃɪp/ 关系 定义表之间的关联(一对多 / 多对多)
ForeignKey /ˈfɒrən kiː/(英)/ˈfɔːrən kiː/(美) 外键 建立表之间的引用关系
DeclarativeBase /dɪˈklærətɪv beɪs/ 声明式基类 2.0 模型继承的核心基类
Mapped /mæpt/ 映射 2.0 字段类型注解(Mapped [])
mapped_column /mæpt ˈkɒləm/(英)/mæpt ˈkɑːləm/(美) 映射列 2.0 定义字段的核心函数
Select /sɪˈlekt/ 选择 2.0 构建查询的核心语句(select ())
Commit /kəˈmɪt/ 提交 事务提交操作
Rollback /ˈrəʊlbæk/(英)/ˈroʊlbæk/(美) 回滚 事务异常时回滚操作
Filter /ˈfɪltə/(英)/ˈfɪltər/(美) 过滤 查询时添加条件过滤
Join /dʒɔɪn/ 连接 多表关联查询(join/joinedload)
Pagination /ˌpædʒɪˈneɪʃn/ 分页 数据分页查询处理
Transaction /trænˈzækʃn/ 事务 数据库事务管理
Metadata /ˈmetədeɪtə/ 元数据 数据库表结构的元数据对象
Cascade /kæˈskeɪd/ 级联 定义关联数据的级联操作(如删除)
Scalar /ˈskeɪlə/ 标量 查询单个结果(scalar ()/scalars ())
Sessionmaker /ˈseʃn ˌmeɪkə/(英)/ˈseʃn ˌmeɪkər/(美) 会话工厂 创建 Session 的工厂函数
AsyncSession /eɪˈsɪŋk ˈseʃn/ 异步会话 2.0 异步操作的会话对象
Index /ˈɪndeks/ 索引 为表字段创建索引
Unique /juˈniːk/ 唯一 定义唯一约束(UniqueConstraint)

二、环境准备

1.安装 SQLAlchemy

# 安装核心库
pip install sqlalchemy

# 查看已安装的 sqlalchemy 包的详细信息
pip show sqlalchemy

执行 pip show sqlalchemy 后,你会看到类似这样的输出:

Name: SQLAlchemy
Version: 2.0.48
Summary: Database Abstraction Library
Home-page: https://www.sqlalchemy.org
Author: Mike Bayer
Author-email: mike_mp@zzzcomputing.com
License: MIT
Location: D:\Workspaces\python\test\Test01\.venv\Lib\site-packages
Requires: greenlet, typing-extensions
Required-by:

输出字段解释:

字段 含义
Name 包的正式名称(注意 SQLAlchemy 是官方名称,小写 sqlalchemy 是安装名)
Version 已安装的版本号(最常用的字段之一,用于确认版本是否匹配需求)
Summary 包的简短描述(说明这个包是做什么的)
Home-page 包的官方网站 / 文档地址
Author/Author-email 包的作者和联系方式
License 开源许可证类型(如 MIT)
Location 包在本地的安装路径(定位包的具体位置)
Requires 该包依赖的其他包(如 sqlalchemy 依赖 greenlet)
Required-by 哪些已安装的包依赖这个包(如 flask-sqlalchemy 依赖 sqlalchemy)

2.安装数据库驱动

不同数据库需要安装对应的驱动,以下是主流数据库的驱动安装命令:

数据库 同步驱动安装命令 异步驱动安装命令
SQLite 无需安装(Python 内置) 无需安装
MySQL/MariaDB pip install pymysqlmysql-connector-python pip install asyncmy
PostgreSQL pip install psycopg2-binary pip install asyncpg
Oracle pip install cx-Oracle 无官方异步驱动(可使用同步 + 线程)

3.连接字符串格式

连接字符串是 Engine 的核心参数,格式为:数据库类型+驱动://用户名:密码@主机:端口/数据库名?参数

数据库 同步连接字符串示例 异步连接字符串示例
SQLite sqlite:///test.db(相对路径)sqlite:////绝对路径/test.db sqlite+aiosqlite:///test.db
MySQL mysql+pymysql://root:123456@localhost:3306/test mysql+asyncmy://root:123456@localhost:3306/test
PostgreSQL postgresql+psycopg2://postgres:123456@localhost:5432/test postgresql+asyncpg://postgres:123456@localhost:5432/test
Oracle oracle+cx_oracle://scott:tiger@127.0.0.1:1521/orcl 无(需用同步驱动 + 异步封装)

说明

  • SQLite 无需用户名 / 密码,/// 表示相对路径,//// 表示绝对路径;
  • 异步连接字符串需在数据库类型后加 +异步驱动名(如 aiosqliteasyncmy);
  • 可添加参数(如 charset=utf8mb4ssl=true)解决编码 / 安全问题。

三、快速入门(以 SQLite 为例)

1.创建数据库连接

1.1.同步连接

在根目录下,创建 sync_session.py 文件:

# 导入核心模块(2.0+ 推荐的导入方式)
from typing import Generator, Optional

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase, Session
from sqlalchemy.pool import QueuePool  # 连接池配置

# ====================== 1. 数据库引擎优化(核心性能优化) ======================
# 针对 SQLite 和 SQLAlchemy 2.0+ 的引擎配置优化
# 关键优化点:
# - 关闭 echo(生产环境必须关闭,调试时可开启)
# - 配置连接池(SQLite 虽无真正连接池,但配置可兼容其他数据库)
# - 设置 SQLite 专属优化参数(提升读写性能)
sync_engine = create_engine(
    "sqlite:///sync_test.db",
    # 基础配置
    echo=True,  # 生产环境关闭 SQL 打印,提升性能
    echo_pool=False,  # 关闭连接池日志
    # 连接池配置(适配 MySQL/PostgreSQL 等数据库时更重要)
    poolclass=QueuePool,
    pool_size=5,  # 核心连接数
    max_overflow=10,  # 最大溢出连接数
    pool_recycle=3600,  # 1小时回收连接,避免失效
    pool_pre_ping=True,  # 获取连接前检测是否有效,防止死连接
    # SQLite 专属优化参数(2.0+ 推荐)
    connect_args={
        "check_same_thread": False,  # 允许多线程访问(SQLite 必要配置)
        "timeout": 30,  # 数据库锁定超时时间(默认5秒,提升到30秒避免锁死)
    },
)


# ====================== 2. 模型基类优化(2.0+ 规范) ======================
# SQLAlchemy 2.0+ 推荐直接继承 DeclarativeBase(替代旧的 declarative_base())
class Base(DeclarativeBase):
    """所有数据模型的基类(2.0+ 规范写法)"""
    __abstract__ = True  # 标记为抽象类,不会生成数据库表


# ====================== 3. 会话工厂优化(安全性+规范性) ======================
# 2.0+ 推荐显式指定类型,同时保留核心配置
SessionLocal = sessionmaker(
    autocommit=False,
    autoflush=False,
    bind=sync_engine,
    expire_on_commit=False,  # 优化:提交后不自动过期对象,提升查询性能
    class_=Session,  # 显式指定会话类(2.0+ 规范)
)


# ====================== 4. 会话获取函数优化(类型安全+容错性) ======================
def get_db() -> Generator[Session, None, None]:
    """
    获取数据库会话生成器(2.0+ 类型安全版本)
    特性:
    - 类型注解明确,支持 IDE 提示
    - 异常捕获并友好提示
    - 确保会话最终关闭
    """
    db_session: Optional[Session] = None  # 初始化变量
    try:
        db_session = SessionLocal()
        yield db_session
    except Exception as e:
        # 可选:记录日志(推荐生产环境添加)
        # import logging; logging.error(f"数据库会话异常: {str(e)}")
        db_session.rollback()  # 异常时回滚未提交的操作
        raise  # 重新抛出异常,让上层处理
    finally:
        if db_session is not None:
            db_session.close()  # 确保会话无论是否异常都关闭


if __name__ == "__main__":
    # 测试示例
    db = next(get_db())
    print(db)  # <sqlalchemy.orm.session.Session object at 0x00000297EFE323C0>

1.2.异步连接

在根目录下,创建 async_session.py 文件:

import asyncio
from typing import AsyncGenerator, Optional

from sqlalchemy.ext.asyncio import (
    create_async_engine,
    async_sessionmaker,
    AsyncSession,
    AsyncEngine  # 显式导入类型
)
from sqlalchemy.orm import DeclarativeBase  # 2.0+ 推荐的基类
from sqlalchemy.pool import AsyncAdaptedQueuePool  # 异步连接池

# ====================== 1. 配置项抽离(提升可维护性) ======================
# 数据库连接配置(统一管理,方便环境切换)
DATABASE_URL = "sqlite+aiosqlite:///async_test.db"
# 调试开关(建议通过环境变量控制,如 os.getenv("DEBUG", "False") == "True")
DEBUG_MODE = True

# ====================== 2. 异步引擎优化(核心性能+兼容性) ======================
async_engine: AsyncEngine = create_async_engine(
    url=DATABASE_URL,
    # 基础配置
    echo=DEBUG_MODE,  # 生产环境关闭,调试时开启
    echo_pool=False,  # 关闭连接池日志(减少IO开销)
    # 异步连接池配置(2.0+ 推荐 AsyncAdaptedQueuePool)
    poolclass=AsyncAdaptedQueuePool,
    pool_size=5,  # 核心连接数(SQLite 无实际连接,适配其他数据库)
    max_overflow=10,  # 最大溢出连接数
    pool_recycle=3600,  # 1小时回收连接,避免失效(适配MySQL/PostgreSQL)
    pool_pre_ping=True,  # 获取连接前检测有效性,防止死连接
    # SQLite 专属异步优化参数
    connect_args={
        "check_same_thread": False,  # 解决SQLite线程安全问题
        "timeout": 30,  # 数据库锁定超时时间(默认5秒,提升稳定性)
    },
)

# ====================== 3. 异步会话工厂优化(2.0+ 规范) ======================
AsyncSessionLocal = async_sessionmaker(
    bind=async_engine,
    class_=AsyncSession,  # 显式指定异步会话类(2.0+ 规范)
    autocommit=False,
    autoflush=False,
    expire_on_commit=False,  # 提交后不失效对象(异步场景减少重复查询)
)


# ====================== 4. 模型基类优化(2.0+ 官方推荐) ======================
# 替代旧的 declarative_base(),2.0+ 推荐直接继承 DeclarativeBase
class Base(DeclarativeBase):
    """所有异步模型的基类(2.0+ 规范写法)"""
    __abstract__ = True  # 标记为抽象类,不生成数据库表


# ====================== 5. 异步会话获取函数(核心优化,解决原逻辑问题) ======================
async def get_async_db() -> AsyncGenerator[AsyncSession, None]:
    """
    获取异步数据库会话生成器(适配 FastAPI 依赖注入)
    优化点:
    1. 移除 async with 与手动 close() 的冲突
    2. 取消自动提交(避免业务逻辑未完成时误提交)
    3. 完善异常处理和类型注解
    4. 确保会话最终关闭,防止连接泄漏
    """
    session: Optional[AsyncSession] = None
    try:
        session = AsyncSessionLocal()  # 手动创建会话,替代 async with
        yield session  # 传递会话给业务逻辑
    except Exception as e:
        # 异常时回滚所有未提交的操作
        if session:
            await session.rollback()
        raise e  # 重新抛出异常,让上层框架(如FastAPI)处理
    finally:
        # 确保会话最终关闭,释放连接
        if session:
            await session.close()


# ====================== 6. 可选:创建数据库表的异步函数(通用工具) ======================
async def create_all_tables() -> None:
    """异步创建所有模型对应的数据库表(初始化用)"""
    async with async_engine.begin() as conn:
        # 异步执行表创建(2.0+ 推荐使用 engine.begin())
        await conn.run_sync(Base.metadata.create_all)


# 测试示例(可选)
async def main():
    # 初始化数据库表
    # await create_all_tables()
    # 使用会话
    async for db in get_async_db():
        # 业务逻辑示例
        print(f"会话状态:{db.is_active}")  # 输出: 会话状态:True


if __name__ == "__main__":
    asyncio.run(main())

2.定义数据模型(映射数据库表)

2.1.定义模型

数据模型类对应数据库表,类属性对应表字段,异步模型定义与同步版完全一致,异步版仅影响 “操作方式”,不影响 “模型结构”

在根目录下,创建 models.py 文件:

from sqlalchemy import Column, Integer, String, DateTime, Boolean, Index
from sqlalchemy.sql import func  # 2.0+ 推荐的函数工具

# 注意:Base 需从之前优化后的 DeclarativeBase 子类导入
# 导入 Base 根据同步/异步场景选择
# from async_session import Base
from sync_session import Base


# ====================== 用户表模型(2.0+ 规范写法) ======================
class User(Base):
    """
    用户表模型
    表名:users
    核心字段:ID、用户名、邮箱、年龄、激活状态、创建/更新时间
    """
    __tablename__ = "users"

    # 1. 核心字段优化(2.0+ 规范 + 约束增强)
    id = Column(
        Integer,
        primary_key=True,
        index=True,
        comment="用户ID(自增主键)",
        autoincrement=True  # 显式声明自增(兼容多数据库)
    )
    username = Column(
        String(50),
        unique=True,
        nullable=False,
        comment="用户名(唯一,非空)",
        index=True  # 高频查询字段加索引,提升查询性能
    )
    email = Column(
        String(100),
        unique=True,
        nullable=True,
        comment="邮箱(唯一,可选)",
        index=True  # 邮箱查询场景多,添加索引
    )
    age = Column(
        Integer,
        default=0,
        nullable=False,  # 显式声明非空(默认值已保证,增强约束)
        comment="年龄(默认0)"
    )
    is_active = Column(
        Boolean,
        default=True,
        nullable=False,
        comment="是否激活(默认True)",
        index=True  # 状态字段高频过滤,添加索引
    )
    # 时间字段优化:使用 func.now() 替代 datetime.now(数据库层面生成时间,兼容异步)
    create_time = Column(
        DateTime,
        default=func.now(),  # 2.0+ 推荐,数据库服务器时间(更精准)
        nullable=False,
        comment="创建时间(自动生成)"
    )
    # 新增更新时间字段(业务常用,2.0+ 支持 onupdate)
    update_time = Column(
        DateTime,
        default=func.now(),
        onupdate=func.now(),  # 数据更新时自动刷新
        nullable=False,
        comment="更新时间(自动更新)"
    )

    # 2. 复合索引(针对多字段查询场景,提升性能)
    __table_args__ = (
        # 示例:按「激活状态+创建时间」查询的复合索引
        Index("idx_user_active_create", "is_active", "create_time"),
    )

    # 3. 自定义方法优化(类型注解 + 实用方法)
    def __repr__(self) -> str:
        """自定义打印格式(类型注解增强)"""
        return f"<User(id={self.id}, username='{self.username}', email='{self.email}', is_active={self.is_active})>"

    def to_dict(self) -> dict:
        """新增:模型转字典(业务常用,避免手动序列化)"""
        return {
            "id": self.id,
            "username": self.username,
            "email": self.email,
            "age": self.age,
            "is_active": self.is_active,
            "create_time": self.create_time.strftime("%Y-%m-%d %H:%M:%S") if self.create_time else None,
            "update_time": self.update_time.strftime("%Y-%m-%d %H:%M:%S") if self.update_time else None,
        }

字段类型说明

  • Integer:整型(对应数据库 INT);
  • String(n):字符串型(对应 VARCHAR (n));
  • DateTime:日期时间型(对应 DATETIME);
  • Boolean:布尔型(对应 BOOLEAN/INT)。

约束说明

  • primary_key=True:主键;
  • unique=True:唯一约束;
  • nullable=False:非空约束;
  • default=值:默认值;
  • index=True:创建索引(提升查询速度)。

2.2.生成表结构

在根目录下,创建 tables.py 文件:

# 注意:Base 需从之前优化后的 DeclarativeBase 子类导入
# 导入 Base 根据同步/异步场景选择
# from session_async import Base, async_engine
from session_sync import Base, sync_engine

# 导入数据模型(必须在创建表之前导入,这样 SQLAlchemy 才能知道要创建哪些表)
from models import User


# ====================== 表创建逻辑优化(兼容同步/异步) ======================
# 1. 同步场景创建表(2.0+ 规范写法)
def create_tables_sync(engine) -> None:
    """同步创建所有表(仅初始化时执行)"""
    # checkfirst=True:默认值,先检查表是否存在,避免重复创建
    Base.metadata.create_all(bind=engine, checkfirst=True)


# 2. 异步场景创建表(2.0+ 推荐写法)
async def create_tables_async(engine) -> None:
    """异步创建所有表(适配异步引擎)"""
    async with engine.begin() as conn:
        # run_sync 适配异步引擎执行同步的 metadata.create_all
        await conn.run_sync(Base.metadata.create_all, checkfirst=True)


# ====================== 调用示例 ======================
# 同步场景
create_tables_sync(sync_engine)

# # 异步场景(需在异步上下文执行)
# import asyncio
# asyncio.run(create_tables_async(async_engine))

2.3.查看生成结果

同步生成 sync_test.db

在这里插入图片描述

异步生成 async_test.db

在这里插入图片描述

3.增删改查(CRUD)

3.1.同步操作

在根目录下,创建 sync_crud.py 文件:

3.1.1.新增数据(Create)
from typing import List, Optional, Sequence

from sqlalchemy import select, update, delete, desc
from sqlalchemy.orm import Session

# 假设 Base、User 模型、get_db 已从优化后的配置文件导入
from models import User
from sync_session import get_db


# ====================== 通用工具函数(提升复用性) ======================
def get_db_session() -> Session:
    """获取数据库会话(封装 get_db,简化调用)"""
    return next(get_db())


# ====================== 3.1.1 新增数据(Create)- 2.0+ 优化 ======================
def create_users():
    """新增数据(单个/批量)- 2.0+ 规范写法"""
    db: Session = get_db_session()
    try:
        # 方式1:创建单个对象(添加类型注解)
        user1: User = User(username="zhangsan", email="zhangsan@example.com", age=20)
        db.add(user1)

        # 方式2:批量创建对象(类型注解 + 列表初始化)
        batch_users: List[User] = [
            User(username="lisi", email="lisi@example.com", age=22),
            User(username="wangwu", email="wangwu@example.com", age=25)
        ]
        db.add_all(batch_users)

        # 提交事务(2.0+ 推荐先 flush 再 commit,确保获取自增ID)
        db.flush()  # 预提交,生成ID但不持久化
        db.commit()  # 最终提交

        # 刷新对象(获取数据库自动生成的字段)
        db.refresh(user1)
        print(f"新增用户ID:{user1.id}")  # 输出:新增用户ID:1
    except Exception as e:
        db.rollback()  # 异常回滚
        raise RuntimeError(f"新增用户失败:{str(e)}") from e
    finally:
        db.close()  # 确保会话关闭

查询 users 表所有记录:

在这里插入图片描述

3.1.2 查询数据(Read)
# ====================== 3.1.2 查询数据(Read)- 2.0+ 核心优化 ======================
def query_users():
    """查询数据 - 2.0+ 推荐使用 select() 构造器(替代旧 query API)"""
    db: Session = get_db_session()
    try:
        # 1. 查询所有用户
        stmt = select(User)
        users: Sequence[User] = db.scalars(stmt).all()
        print("所有用户:", users)   # 输出: [<User(id=1, username='zhangsan', email='zhangsan@example.com', is_active=True)>, <User(id=2, username='lisi', email='lisi@example.com', is_active=True)>, <User(id=3, username='wangwu', email='wangwu@example.com', is_active=True)>]

        # 2. 查询单个用户(按主键)
        user: Optional[User] = db.get(User, 1)  # 2.0+ 推荐直接用 session.get()
        print("主键为1的用户:", user)   # 输出: <User(id=1, username='zhangsan', email='zhangsan@example.com', is_active=True)>

        # 3. 条件查询
        # 方式1:filter(2.0+ 推荐,支持复杂条件)
        stmt = select(User).where(User.username == "zhangsan")
        user: Optional[User] = db.scalars(stmt).first()
        print("方式1:用户名是zhangsan的用户:", user)   # 输出: <User(id=1, username='zhangsan', email='zhangsan@example.com', is_active=True)>

        # 方式2:简易条件(2.0+ 无 filter_by,用 where 简化)
        stmt = select(User).where(User.username == "zhangsan")
        user: Optional[User] = db.scalars(stmt).first()
        print("方式2:用户名是zhangsan的用户:", user)   # 输出: <User(id=1, username='zhangsan', email='zhangsan@example.com', is_active=True)>

        # 4. 多条件查询
        stmt = select(User).where(User.age > 20, User.is_active == True)
        users: Sequence[User] = db.scalars(stmt).all()
        print("年龄>20且激活的用户:", users)   # 输出: [<User(id=2, username='lisi', email='lisi@example.com', is_active=True)>, <User(id=3, username='wangwu', email='wangwu@example.com', is_active=True)>]

        # 5. 排序查询(2.0+ 用 desc()/asc() 函数)
        stmt = select(User).order_by(desc(User.age))
        users: Sequence[User] = db.scalars(stmt).all()
        print("按年龄降序的用户:", users)   # 输出: [<User(id=3, username='wangwu', email='wangwu@example.com', is_active=True)>, <User(id=2, username='lisi', email='lisi@example.com', is_active=True)>, <User(id=1, username='zhangsan', email='zhangsan@example.com', is_active=True)>]

        # 6. 限制查询结果数量(2.0+ 用 limit() 方法)
        stmt = select(User).limit(2)
        users: Sequence[User] = db.scalars(stmt).all()
        print("前2个用户:", users)   # 输出: [<User(id=1, username='zhangsan', email='zhangsan@example.com', is_active=True)>, <User(id=2, username='lisi', email='lisi@example.com', is_active=True)>]
    finally:
        db.close()
3.1.3 修改数据(Update)
# ====================== 3.1.3 修改数据(Update)- 2.0+ 优化 ======================
def update_users():
    """修改数据(单个/批量)- 2.0+ 规范写法"""
    db: Session = get_db_session()
    try:
        # 1. 修改单个对象(查询-修改-提交)
        stmt = select(User).where(User.username == "zhangsan")
        user: Optional[User] = db.scalars(stmt).first()
        if user:
            # 修改属性
            user.age = 21
            user.email = "zhangsan_new@example.com"
            db.commit()  # 提交修改
            db.refresh(user)  # 刷新对象获取最新数据
            print("修改后的用户:", user)  # 输出: <User(id=1, username='zhangsan', email='zhangsan_new@example.com', is_active=True)>

        # 2. 批量修改(2.0+ 新 API:update() 构造器)
        stmt = (
            update(User)
            .where(User.age < 25)
            .values(is_active=False)  # 2.0+ 用 values() 指定修改字段
            .execution_options(synchronize_session="fetch")  # 同步会话数据
        )
        result = db.execute(stmt)
        db.commit()
        # 输出: 批量修改完成,影响行数:2
        print(f"批量修改完成,影响行数:{result.rowcount}")  # type: ignore[attr-defined]
    except Exception as e:
        db.rollback()
        raise RuntimeError(f"修改用户失败:{str(e)}") from e
    finally:
        db.close()
3.1.4 删除数据(Delete)
# ====================== 3.1.4 删除数据(Delete)- 2.0+ 优化 ======================
def delete_users():
    """删除数据(单个/批量)- 2.0+ 规范写法"""
    db: Session = get_db_session()
    try:
        # 1. 删除单个对象
        stmt = select(User).where(User.username == "wangwu")
        user: Optional[User] = db.scalars(stmt).first()
        if user:
            db.delete(user)
            db.commit()
            print("删除用户完成")

        # 2. 批量删除(2.0+ 新 API:delete() 构造器)
        stmt = (
            delete(User)
            .where(User.is_active == False)
            .execution_options(synchronize_session="fetch")
        )
        result = db.execute(stmt)
        db.commit()
        # 输出: 批量删除完成,影响行数:2
        print(f"批量删除完成,影响行数:{result.rowcount}")  # type: ignore[attr-defined]
    except Exception as e:
        db.rollback()
        raise RuntimeError(f"删除用户失败:{str(e)}") from e
    finally:
        db.close()


# ====================== 调用示例 ======================
if __name__ == "__main__":
    # 执行新增
    # create_users()
    # 执行查询
    # query_users()
    # 执行修改
    # update_users()
    # 执行删除
    delete_users()

3.2.异步操作

在根目录下,创建 async_crud.py 文件:

3.2.1.新增数据(Create)
import asyncio
from typing import List, Optional

from sqlalchemy import select, update, delete, desc, Sequence
from sqlalchemy.ext.asyncio import AsyncSession

# 假设从优化后的异步配置文件导入以下依赖
from models import User


# ====================== 通用工具函数(异步) ======================
async def get_async_db_session() -> AsyncSession:
    """获取异步数据库会话(封装 get_async_db,简化调用)"""
    # 由于 get_async_db 现在是上下文管理器,我们需要一个不同的实现
    # 直接创建会话而不是使用上下文管理器
    from async_session import AsyncSessionLocal

    session = AsyncSessionLocal()
    return session


# ====================== 3.1.1 新增数据(Create)- 异步版 ======================
async def create_users_async():
    """异步新增数据(单个/批量)"""
    db: AsyncSession = await get_async_db_session()
    try:
        # 方式1:创建单个对象
        user1: User = User(username="zhangsan", email="zhangsan@example.com", age=20)
        db.add(user1)

        # 方式2:批量创建对象
        batch_users: List[User] = [
            User(username="lisi", email="lisi@example.com", age=22),
            User(username="wangwu", email="wangwu@example.com", age=25)
        ]
        db.add_all(batch_users)

        # 异步提交(2.0+ 异步必须用 await)
        await db.flush()  # 预提交,生成自增ID
        await db.commit()  # 最终提交

        # 异步刷新对象
        await db.refresh(user1)
        print(f"新增用户ID:{user1.id}")  # 输出:新增用户ID:1
    except Exception as e:
        await db.rollback()  # 异步回滚
        raise RuntimeError(f"异步新增用户失败:{str(e)}") from e
    finally:
        await db.close()  # 异步关闭会话

查询 users 表所有记录:

在这里插入图片描述

3.2.2 查询数据(Read)
# ====================== 3.1.2 查询数据(Read)- 异步版 ======================
async def query_users_async():
    """异步查询数据(2.0+ 异步 select 构造器)"""
    db: AsyncSession = await get_async_db_session()
    try:
        # 1. 查询所有用户(异步核心:await + scalars + all)
        stmt = select(User)
        result = await db.scalars(stmt)  # 异步执行查询
        users: Sequence[User] = result.all()
        print("所有用户:",
              users)  # 输出: [<User(id=1, username='zhangsan', email='zhangsan@example.com', is_active=True)>, <User(id=2, username='lisi', email='lisi@example.com', is_active=True)>, <User(id=3, username='wangwu', email='wangwu@example.com', is_active=True)>]

        # 2. 查询单个用户(按主键,异步 get)
        user: Optional[User] = await db.get(User, 1)  # 异步 get 方法
        print("主键为1的用户:",
              user)  # 输出: <User(id=1, username='zhangsan', email='zhangsan@example.com', is_active=True)>

        # 3. 条件查询
        # 方式1:where 条件 + 异步执行
        stmt = select(User).where(User.username == "zhangsan")
        user: Optional[User] = (await db.scalars(stmt)).first()
        print("方式1:用户名是zhangsan的用户:",
              user)  # 输出: <User(id=1, username='zhangsan', email='zhangsan@example.com', is_active=True)>

        # 4. 多条件查询
        stmt = select(User).where(User.age > 20, User.is_active == True)
        users: Sequence[User] = (await db.scalars(stmt)).all()
        print("年龄>20且激活的用户:",
              users)  # 输出: [<User(id=2, username='lisi', email='lisi@example.com', is_active=True)>, <User(id=3, username='wangwu', email='wangwu@example.com', is_active=True)>]

        # 5. 排序查询(2.0+ desc 函数)
        stmt = select(User).order_by(desc(User.age))
        users: Sequence[User] = (await db.scalars(stmt)).all()
        print("按年龄降序的用户:",
              users)  # 输出: [<User(id=3, username='wangwu', email='wangwu@example.com', is_active=True)>, <User(id=2, username='lisi', email='lisi@example.com', is_active=True)>, <User(id=1, username='zhangsan', email='zhangsan@example.com', is_active=True)>]

        # 6. 限制查询结果数量
        stmt = select(User).limit(2)
        users: Sequence[User] = (await db.scalars(stmt)).all()
        print("前2个用户:",
              users)  # 输出: [<User(id=1, username='zhangsan', email='zhangsan@example.com', is_active=True)>, <User(id=2, username='lisi', email='lisi@example.com', is_active=True)>]
    finally:
        await db.close()
3.2.3 修改数据(Update)
# ====================== 3.1.3 修改数据(Update)- 异步版 ======================
async def update_users_async():
    """异步修改数据(单个/批量)"""
    db: AsyncSession = await get_async_db_session()
    try:
        # 1. 修改单个对象(查询-修改-提交)
        stmt = select(User).where(User.username == "zhangsan")
        user: Optional[User] = (await db.scalars(stmt)).first()
        if user:
            user.age = 21
            user.email = "zhangsan_new@example.com"
            await db.commit()  # 异步提交
            await db.refresh(user)  # 异步刷新
            print("修改后的用户:",
                  user)  # 输出: <User(id=1, username='zhangsan', email='zhangsan_new@example.com', is_active=True)>

        # 2. 批量修改(2.0+ 异步 update 构造器)
        stmt = (
            update(User)
            .where(User.age < 25)
            .values(is_active=False)
            .execution_options(synchronize_session="fetch")
        )
        result = await db.execute(stmt)  # 异步执行批量更新
        await db.commit()
        # 输出: 批量修改完成,影响行数:2
        print(f"批量修改完成,影响行数:{result.rowcount}")  # type: ignore[attr-defined]
    except Exception as e:
        await db.rollback()
        raise RuntimeError(f"异步修改用户失败:{str(e)}") from e
    finally:
        await db.close()
3.2.4 删除数据(Delete)
# ====================== 3.1.4 删除数据(Delete)- 异步版 ======================
async def delete_users_async():
    """异步删除数据(单个/批量)"""
    db: AsyncSession = await get_async_db_session()
    try:
        # 1. 删除单个对象
        stmt = select(User).where(User.username == "wangwu")
        user: Optional[User] = (await db.scalars(stmt)).first()
        if user:
            await db.delete(user)  # 异步删除
            await db.commit()
            print("删除用户完成")

        # 2. 批量删除(2.0+ 异步 delete 构造器)
        stmt = delete(User).where(User.is_active == False)
        result = await db.execute(stmt)  # 异步执行批量删除
        await db.commit()
        # 输出: 批量删除完成,影响行数:2
        print(f"批量删除完成,影响行数:{result.rowcount}")  # type: ignore[attr-defined]
    except Exception as e:
        await db.rollback()
        raise RuntimeError(f"异步删除用户失败:{str(e)}") from e
    finally:
        await db.close()


# ====================== 异步入口函数(执行所有操作) ======================
async def main():
    """异步主函数:按顺序执行所有 CRUD 操作"""
    # 1. 新增数据
    # await create_users_async()
    # 2. 查询数据
    # await query_users_async()
    # 3. 修改数据
    # await update_users_async()
    # 4. 删除数据
    await delete_users_async()


# ====================== 执行异步代码 ======================
if __name__ == "__main__":
    # 异步代码必须在 asyncio 事件循环中执行
    asyncio.run(main())

3.3.同步 vs 异步核心差异

3.3.1.核心语法差异(最直观)
维度 同步操作 异步操作 关键说明
函数定义 def func(): async def func(): 所有异步操作函数必须标记 async
操作执行 直接调用(如 db.commit() 需加 await(如 await db.commit() 所有数据库 IO 操作必须用 await 挂起
入口执行 直接调用 func() asyncio.run(func()) 异步代码必须在事件循环中执行
会话获取 next(get_db()) await get_async_db_session() 异步生成器需 await + 循环获取
3.3.2.核心 API 差异(2.0+ 重点)
同步 API(旧 / 2.0 兼容) 异步 API(2.0+ 推荐) 适用场景
Session AsyncSession 会话对象类型
db.get(User, 1) await db.get(User, 1) 按主键查询
db.scalars(stmt).all() (await db.scalars(stmt)).all() 通用查询
db.execute(stmt) await db.execute(stmt) 执行 update/delete 语句
db.commit() await db.commit() 提交事务
db.rollback() await db.rollback() 回滚事务
db.delete(user) await db.delete(user) 删除单个对象
db.close() await db.close() 关闭会话
3.3.3.底层依赖 / 配置差异
配置项 同步版本 异步版本
引擎创建 create_engine() create_async_engine()
会话工厂 sessionmaker() async_sessionmaker()
SQLite 驱动 内置 sqlite3 需安装 aiosqlitepip install aiosqlite
MySQL/PG 驱动 pymysql/psycopg2 asyncmy/psycopg[async]
连接池 QueuePool AsyncAdaptedQueuePool
3.3.4.执行逻辑差异
特性 同步执行 异步执行
阻塞方式 线程阻塞(等待数据库响应) 协程挂起(不阻塞线程,可处理其他任务)
并发能力 受线程数限制(GIL 影响) 高并发(单线程可处理上千协程)
异常处理 try-except 直接捕获 try-except 捕获 + await 内执行
事务控制 同步 flush/commit/rollback 异步 flush/commit/rollback(均需 await)

四、进阶技巧(同步版)

1.高级查询

异步版高级查询语法与同步版一致,仅需在执行时添加 await,在根目录下创建 sync_query.py 文件:

1.1.定义模型

from typing import Tuple

from sqlalchemy import (
    Column, Integer, String, DateTime, Boolean,
    or_, and_, not_, func, select, distinct, Sequence
)
from sqlalchemy.orm import Session, DeclarativeBase

from sync_crud import get_db_session
from sync_session import sync_engine


# ====================== 基础模型定义(2.0+ 规范) ======================
class Base(DeclarativeBase):
    __abstract__ = True


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True, autoincrement=True, comment="用户ID")
    username = Column(String(50), unique=True, nullable=False, index=True, comment="用户名")
    email = Column(String(100), unique=True, nullable=True, index=True, comment="邮箱")
    age = Column(Integer, default=0, nullable=False, comment="年龄")
    is_active = Column(Boolean, default=True, nullable=False, index=True, comment="是否激活")
    create_time = Column(DateTime, default=func.now(), nullable=False, comment="创建时间")

    def __repr__(self) -> str:
        return f"<User(id={self.id}, username='{self.username}', email='{self.email}')>"


class Article(Base):
    __tablename__ = "articles"

    id = Column(Integer, primary_key=True, index=True, autoincrement=True, comment="文章ID")
    title = Column(String(100), nullable=False, comment="文章标题")
    content = Column(String(500), comment="文章内容")
    user_id = Column(Integer, nullable=False, index=True, comment="关联用户ID")
    create_time = Column(DateTime, default=func.now(), nullable=False, comment="创建时间")

    def __repr__(self) -> str:
        return f"<Article(id={self.id}, title='{self.title}', user_id={self.user_id})>"


# ====================== 1.1 初始化表和测试数据 ======================
def init_data():
    """初始化测试数据(仅首次执行)"""
    db: Session = get_db_session()
    try:
        # 创建表
        print("正在创建数据库表...")
        Base.metadata.create_all(bind=sync_engine)
        print("表创建完成")

        # 清空旧数据(避免重复)
        print("正在清空旧数据...")
        db.query(User).delete()
        db.query(Article).delete()
        db.commit()
        print("旧数据已清空")

        # 新增用户
        print("正在新增用户...")
        users = [
            User(username="zhangsan", email="zhangsan@example.com", age=20),
            User(username="lisi", email="lisi@example.com", age=22),
            User(username="wangwu", email="wangwu@example.com", age=25),
            User(username="zhaoliu", email=None, age=30)  # type: ignore[arg-type]
        ]
        db.add_all(users)
        db.commit()
        print(f"用户新增完成,共 {len(users)} 个用户")

        # 新增文章
        print("正在新增文章...")
        articles = [
            Article(title="Python 入门", content="SQLAlchemy 学习", user_id=1),
            Article(title="SQLAlchemy 进阶", content="关联查询", user_id=1),
            Article(title="MySQL 优化", content="索引使用", user_id=2)
        ]
        db.add_all(articles)
        db.commit()
        print(f"文章新增完成,共 {len(articles)} 篇文章")

        print("\n=== 测试数据初始化完成 ===\n")
    except Exception as e:
        db.rollback()
        error_msg = f"初始化数据失败:{str(e)}"
        print(error_msg)
        import traceback
        traceback.print_exc()
        raise RuntimeError(error_msg) from e
    finally:
        db.close()

1.2.筛选查询

除基础条件外,SQLAlchemy 支持丰富的筛选操作:

# ====================== 1.2 筛选查询(2.0+ 优化版) ======================
def filter_query():
    """筛选查询(2.0+ select 构造器)"""
    db: Session = get_db_session()
    try:
        # 1. 或条件(or_)
        stmt = select(User).where(or_(User.age == 20, User.username == "lisi"))
        users: Sequence[User] = db.scalars(stmt).all()
        print("或条件(or_): ", users)  # 输出: [<User(id=1, username='zhangsan', email='zhangsan@example.com')>, <User(id=2, username='lisi', email='lisi@example.com')>]

        # 2. 与条件(and_)
        stmt = select(User).where(and_(User.age > 20, User.is_active == True))
        users = db.scalars(stmt).all()
        print("与条件(and_): ", users)  # 输出: [<User(id=2, username='lisi', email='lisi@example.com')>, <User(id=3, username='wangwu', email='wangwu@example.com')>, <User(id=4, username='zhaoliu', email='None')>]

        # 3. 非条件(not_)
        stmt = select(User).where(not_(User.age == 20))
        users = db.scalars(stmt).all()
        print("非条件(not_): ", users)  # 输出: [<User(id=2, username='lisi', email='lisi@example.com')>, <User(id=3, username='wangwu', email='wangwu@example.com')>, <User(id=4, username='zhaoliu', email='None')>]

        # 4. 模糊查询(like)
        # 以zhang开头
        stmt = select(User).where(User.username.like("zhang%"))
        users = db.scalars(stmt).all()
        print("模糊查询(以zhang开头): ", users)  # 输出: [<User(id=1, username='zhangsan', email='zhangsan@example.com')>]
        # 包含si
        stmt = select(User).where(User.username.like("%si%"))
        users = db.scalars(stmt).all()
        print("模糊查询(包含si): ", users)  # 输出: [<User(id=2, username='lisi', email='lisi@example.com')>]

        # 5. 范围查询(in_)
        stmt = select(User).where(User.age.in_([20, 21, 22]))
        users = db.scalars(stmt).all()
        print("范围查询(in_): ", users)  # 输出: [<User(id=1, username='zhangsan', email='zhangsan@example.com')>, <User(id=2, username='lisi', email='lisi@example.com')>]

        # 6. 空值查询
        # 邮箱为空
        stmt = select(User).where(User.email.is_(None))
        users = db.scalars(stmt).all()
        print("空值查询(is_): ", users)  # 输出: [<User(id=4, username='zhaoliu', email='None')>]
        # 邮箱不为空
        stmt = select(User).where(User.email.isnot(None))
        users = db.scalars(stmt).all()
        print("空值查询(isnot): ", users)  # 输出: [<User(id=1, username='zhangsan', email='zhangsan@example.com')>, <User(id=2, username='lisi', email='lisi@example.com')>, <User(id=3, username='wangwu', email='wangwu@example.com')>]
    finally:
        db.close()

1.2.聚合查询

使用 func 实现计数、求和、平均值等聚合操作:

# ====================== 1.3 聚合查询(2.0+ 优化版) ======================
def aggregate_query():
    """聚合查询(2.0+ func 优化)"""
    db: Session = get_db_session()
    try:
        # 1. 计数(推荐用 scalar() 获取单个值)
        stmt = select(func.count(User.id))
        total_users: int = db.scalar(stmt)
        print("总用户数:", total_users)  # 输出: 4

        # 2. 求和
        stmt = select(func.sum(User.age))
        total_age: int = db.scalar(stmt)
        print("年龄总和:", total_age)  # 输出: 97

        # 3. 平均值(转换为浮点数,避免精度丢失)
        stmt = select(func.avg(User.age))
        avg_age_value = db.scalar(stmt)
        avg_age: float = float(avg_age_value) if avg_age_value is not None else 0.0
        # 保留 2 位小数
        print("平均年龄:", round(avg_age, 2))  # 输出: 24.25

        # 4. 最大值/最小值
        stmt_max = select(func.max(User.age))
        max_age: int = db.scalar(stmt_max)
        stmt_min = select(func.min(User.age))
        min_age: int = db.scalar(stmt_min)
        print("最大年龄:", max_age, "最小年龄:", min_age)  # 输出: 最大年龄: 30 最小年龄: 20

        # 5. 分组聚合(按激活状态分组)
        stmt = select(User.is_active, func.count(User.id)).group_by(User.is_active)
        result: Sequence[Tuple[bool, int]] = db.execute(stmt).all()
        print("按激活状态分组:", result)  # 输出: [(True, 4)]
    finally:
        db.close()

1.4.关联查询

以「用户 - 文章」一对多为列

# ====================== 1.4 关联查询(2.0+ 优化版) ======================
def join_query():
    """关联查询(2.0+ join/outerjoin 规范写法)"""
    db: Session = get_db_session()
    try:
        # 1. 内连接(INNER JOIN):查询用户及其文章
        # 生成的SQL:SELECT users.id, users.username, users.email, users.age, users.is_active, users.create_time, articles.id AS id_1, articles.title, articles.content, articles.user_id, articles.create_time AS create_time_1
        # FROM users JOIN articles ON users.id = articles.user_id
        stmt = select(User, Article).join(Article, User.id == Article.user_id)
        result: Sequence[Tuple[User, Article]] = db.execute(stmt).all()
        print("\n内连接结果:")
        for user, article in result:  # type: ignore[misc]
            print(f"用户:{user.username},文章:{article.title}")
            # 输出:
            # 用户:zhangsan,文章:Python 入门
            # 用户:zhangsan,文章:SQLAlchemy 进阶
            # 用户:lisi,文章:MySQL 优化

        # 2. 左连接(LEFT JOIN):查询所有用户(包括无文章的)
        # 生成的SQL:SELECT users.id, users.username, users.email, users.age, users.is_active, users.create_time, articles.id AS id_1, articles.title, articles.content, articles.user_id, articles.create_time AS create_time_1
        # FROM users LEFT OUTER JOIN articles ON users.id = articles.user_id
        stmt = select(User, Article).outerjoin(Article, User.id == Article.user_id)
        result = db.execute(stmt).all()
        print("\n左连接结果:")
        for user, article in result:  # type: ignore[misc]
            print(f"用户:{user.username},文章:{article.title if article else '无'}")
            # 输出:
            # 用户:zhangsan,文章:Python 入门
            # 用户:zhangsan,文章:SQLAlchemy 进阶
            # 用户:lisi,文章:MySQL 优化
            # 用户:wangwu,文章:无
            # 用户:zhaoliu,文章:无

    finally:
        db.close()

1.5.子查询

子查询是嵌套在主查询中的查询,示例:

# ====================== 1.5 子查询(2.0+ 优化版) ======================
def sub_query():
    """子查询(2.0+ scalar_subquery/subquery 规范)"""
    db: Session = get_db_session()
    try:
        # 示例1:查询有文章的用户
        # 子查询:所有发布过文章的用户ID(去重)
        subquery = select(distinct(Article.user_id)).scalar_subquery()
        # 主查询
        stmt = select(User).where(User.id.in_(subquery))
        users = db.scalars(stmt).all()
        print("\n发布过文章的用户:", users)  # 输出: [<User(id=1, username='zhangsan', email='zhangsan@example.com')>, <User(id=2, username='lisi', email='lisi@example.com')>]

        # 示例2:查询年龄大于平均年龄的用户
        # 子查询:平均年龄
        subquery = select(func.avg(User.age)).scalar_subquery()
        # 主查询
        stmt = select(User).where(User.age > subquery)
        users = db.scalars(stmt).all()
        print("年龄大于平均年龄的用户:", users)  # 输出: [<User(id=3, username='wangwu', email='wangwu@example.com')>, <User(id=4, username='zhaoliu', email='None')>]

        # 示例3:查询发布过2篇以上文章的用户
        # 子查询:统计用户文章数(>2)
        subquery = (
            select(Article.user_id, func.count(Article.id).label("article_count"))
            .group_by(Article.user_id)
            .having(func.count(Article.id) > 2)
            .subquery()
        )
        # 主查询:关联子查询
        stmt = select(User).join(subquery, User.id == subquery.c.user_id)
        users = db.scalars(stmt).all()
        print("发布过2篇以上文章的用户:", users)  # 输出: []
    finally:
        db.close()

1.6.分页查询

分页是业务中常用的功能,通过 offset(偏移量)和 limit(数量)实现:

# ====================== 1.6 分页查询(2.0+ 优化版) ======================
def pagination_query():
    """分页查询(2.0+ 规范写法,带总页数计算)"""
    db: Session = get_db_session()
    try:
        # 分页参数
        page = 1  # 当前页
        page_size = 2  # 每页条数
        offset = (page - 1) * page_size

        # 1. 分页查询数据
        stmt = select(User).offset(offset).limit(page_size).order_by(User.id)
        users: Sequence[User] = db.scalars(stmt).all()
        print(f"\n第{page}页用户:", users)  # 输出: 第1页用户: [<User(id=1, username='zhangsan', email='zhangsan@example.com')>, <User(id=2, username='lisi', email='lisi@example.com')>]

        # 2. 计算总条数和总页数(优化:一次查询获取总条数)
        total_count_stmt = select(func.count(User.id))
        total_users: int = db.scalar(total_count_stmt)
        total_pages = (total_users + page_size - 1) // page_size  # 向上取整
        print(f"总用户数:{total_users},总页数:{total_pages}")  # 输出: 总用户数:4,总页数:2

        # 扩展:分页通用函数
        def paginate_query(query_stmt, page_num: int, page_size_num: int):  # type: ignore[no-untyped-def]
            """通用分页函数"""
            offset_num = (page_num - 1) * page_size_num
            # 查询数据
            query_data = db.scalars(query_stmt.offset(offset_num).limit(page_size_num)).all()
            # 查询总条数
            query_total = db.scalar(select(func.count()).select_from(query_stmt.subquery()))
            query_total_pages = (query_total + page_size_num - 1) // page_size_num if query_total else 0
            return query_data, query_total or 0, query_total_pages

        # 测试通用分页函数
        data, total, pages = paginate_query(select(User), page_num=2, page_size_num=2)
        print(f"第 2 页用户:{data},总条数:{total},总页数:{pages}")  # 输出: 第 2 页用户:[<User(id=3, username='wangwu', email='wangwu@example.com')>, <User(id=4, username='zhaoliu', email='None')>],总条数:4,总页数:2
    finally:
        db.close()


# ====================== 执行所有测试 ======================
if __name__ == "__main__":
    # 初始化测试数据(仅首次执行)
    init_data()
    # 执行筛选查询
    filter_query()
    # 执行聚合查询
    aggregate_query()
    # 执行关联查询
    join_query()
    # 执行子查询
    sub_query()
    # 执行分页查询
    pagination_query()

2.关系映射

SQLAlchemy 提供 relationship 实现表之间的关联关系,无需手动写 JOIN 语句。

2.1.定义模型

from typing import List, Optional

from sqlalchemy import (
    Column, Integer, String, ForeignKey, Boolean, Table, func, DateTime
)
from sqlalchemy.orm import (
    Session, DeclarativeBase, relationship, Mapped, mapped_column
)

# 假设从优化后的配置导入会话和引擎
from sync_crud import get_db_session
from sync_session import sync_engine


# ====================== 基础基类(2.0+ 规范) ======================
class Base(DeclarativeBase):
    """所有模型的基类(2.0+ 推荐 DeclarativeBase)"""
    __abstract__ = True

    # 通用字段:创建/更新时间(所有表复用)
    create_time = mapped_column(DateTime, default=func.now(), nullable=False, comment="创建时间")
    update_time = mapped_column(DateTime, default=func.now(), onupdate=func.now(), nullable=False, comment="更新时间")


# ====================== 2.2 一对多(用户 - 文章) ======================
# 推荐:先定义被关联模型(Article),避免循环引用;或使用字符串引用
class Article(Base):
    __tablename__ = "articles"

    # 2.0+ 推荐使用 mapped_column 替代旧 Column 写法(类型注解更友好)
    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, index=True, comment="文章ID")
    title: Mapped[str] = mapped_column(String(100), nullable=False, index=True, comment="文章标题")
    content: Mapped[Optional[str]] = mapped_column(String(500), comment="文章内容")
    user_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id", ondelete="CASCADE"), comment="关联用户ID")

    # 多对一关联:多个文章属于一个用户
    author: Mapped["User"] = relationship(
        "User",
        back_populates="articles",
        lazy="selectin"  # 2.0+ 推荐的加载策略,避免N+1查询
    )

    def __repr__(self) -> str:
        return f"<Article(id={self.id}, title='{self.title}', user_id={self.user_id})>"


# ====================== 2.4 多对多(用户 - 角色) ======================
# 中间表(2.0+ 规范写法)
user_role = Table(
    "user_role",
    Base.metadata,
    Column("user_id", Integer, ForeignKey("users.id", ondelete="CASCADE"), primary_key=True, comment="用户ID"),
    Column("role_id", Integer, ForeignKey("roles.id", ondelete="CASCADE"), primary_key=True, comment="角色ID"),
    # 新增中间表通用字段(可选)
    Column("create_time", DateTime, default=func.now(), nullable=False, comment="关联创建时间")
)


class Role(Base):
    __tablename__ = "roles"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, index=True, comment="角色ID")
    name: Mapped[str] = mapped_column(String(50), unique=True, nullable=False, comment="角色名称")

    # 多对多关联:一个角色对应多个用户
    users: Mapped[List["User"]] = relationship(
        "User",
        secondary=user_role,  # 指定中间表
        back_populates="roles",
        cascade="all",
        lazy="selectin"
    )


class User(Base):
    __tablename__ = "users"

    # 2.0+ 新写法:Mapped + mapped_column(类型提示更精准)
    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, index=True, comment="用户ID")
    username: Mapped[str] = mapped_column(String(50), unique=True, nullable=False, index=True, comment="用户名")
    email: Mapped[Optional[str]] = mapped_column(String(100), unique=True, index=True, comment="邮箱")
    is_active: Mapped[bool] = mapped_column(Boolean, default=True, comment="是否激活")

    # 一对多关联:一个用户有多个文章
    articles: Mapped[List["Article"]] = relationship(
        "Article",
        back_populates="author",
        cascade="all, delete-orphan",  # 级联删除:删用户时删文章,删文章自动解除关联
        passive_deletes=True,  # 配合外键 ondelete="CASCADE",提升删除性能
        lazy="selectin"  # 预加载关联数据,避免N+1查询
    )

    # 一对一关联:uselist=False 表示非列表(一对一)
    profile: Mapped[Optional["UserProfile"]] = relationship(
        "UserProfile",
        back_populates="user",
        uselist=False,
        cascade="all, delete-orphan",
        passive_deletes=True,
        lazy="selectin"
    )

    # 多对多关联:一个用户对应多个角色
    roles: Mapped[List["Role"]] = relationship(
        "Role",
        secondary=user_role,
        back_populates="users",
        cascade="all",
        lazy="selectin"
    )

    def __repr__(self) -> str:
        return f"<User(id={self.id}, username='{self.username}', email='{self.email}')>"


# ====================== 2.3 一对一(用户 - 用户详情) ======================
class UserProfile(Base):
    __tablename__ = "user_profiles"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, comment="详情ID")
    address: Mapped[Optional[str]] = mapped_column(String(200), comment="地址")
    phone: Mapped[Optional[str]] = mapped_column(String(20), comment="手机号")
    # 唯一外键:保证一对一关系
    user_id: Mapped[int] = mapped_column(
        Integer,
        ForeignKey("users.id", ondelete="CASCADE"),
        unique=True,
        comment="关联用户ID"
    )

    # 一对一关联:一个详情属于一个用户
    user: Mapped["User"] = relationship(
        "User",
        back_populates="profile",
        lazy="selectin"
    )


# ====================== 2.1 工具函数:初始化表和测试数据 ======================
def init_relationship_tables():
    """初始化关系表(删除旧表+创建新表)"""
    db: Session = get_db_session()
    try:
        # 先删除旧表(按依赖顺序)
        Base.metadata.drop_all(bind=sync_engine)
        # 创建新表
        Base.metadata.create_all(bind=sync_engine)
        print("关系表初始化完成")
    finally:
        db.close()

查询生成的表:

在这里插入图片描述

2.2.一对多(用户 - 文章)

# ====================== 2.2 一对多使用示例 ======================
def one_to_many():
    """测试一对多关系(修复PyCharm报错+规范写法)"""
    db: Session = get_db_session()
    try:
        # 1. 新增用户+文章(修复PyCharm参数报错:使用类型注解+分步创建)
        # 方式1:推荐写法(分步创建,无IDE报错)
        new_user = User(username="tianqi", email="tianqi@example.com")
        # 创建文章并关联作者(推荐)
        article1 = Article(title="SQLAlchemy关系映射", content="一对多示例", author=new_user)
        article2 = Article(title="PythonORM", content="ORM入门", author=new_user)
        db.add_all([new_user, article1, article2])
        db.flush()  # 预提交,生成ID
        db.commit()
        print("新增用户+文章完成")

        # 2. 查询用户的所有文章(2.0+ select 构造器)
        from sqlalchemy import select
        stmt = select(User).where(User.username == "tianqi")
        user = db.scalars(stmt).first()
        if user:
            print(f"\n{user.username}的文章:", [art.title for art in user.articles])   # 输出: tianqi的文章: ['SQLAlchemy关系映射', 'PythonORM']

        # 3. 查询文章的作者
        stmt = select(Article).where(Article.title == "SQLAlchemy关系映射")
        article = db.scalars(stmt).first()
        if article:
            print(f"{article.title}的作者:", article.author.username)  # 输出: SQLAlchemy关系映射的作者: tianqi

    except Exception as e:
        db.rollback()
        raise RuntimeError(f"一对多测试失败:{str(e)}") from e
    finally:
        db.close()

参数说明

  • back_populates:反向关联(对应另一张表的关系字段);
  • cascade="all, delete-orphan":级联操作(删除用户时自动删除其文章);
  • ForeignKey("users.id"):外键约束(关联用户表的 id 字段)。

在这里插入图片描述

2.3.一对一(用户 - 用户详情)

# ====================== 2.3 一对一使用示例) ======================
def one_to_one():
    """测试一对一关系"""
    db: Session = get_db_session()
    try:
        # 1. 查询用户
        from sqlalchemy import select
        stmt = select(User).where(User.username == "tianqi")
        user = db.scalars(stmt).first()
        if user:
            # 2. 新增/更新用户详情
            if user.profile:
                db.delete(user.profile)
                db.flush()

            user.profile = UserProfile(address="北京市海淀区", phone="13800138000")
            db.commit()
            # 输出: tianqi的地址: 北京市海淀区
            print(f"\n{user.username}的地址:", user.profile.address)  # type: ignore
            # 输出: tianqi的手机号: 13800138000
            print(f"{user.username}的手机号:", user.profile.phone)  # type: ignore

    except Exception as e:
        db.rollback()
        raise RuntimeError(f"一对一测试失败:{str(e)}") from e
    finally:
        db.close()

执行后,查询结果:

在这里插入图片描述

2.4.多对多(用户 - 角色)

多对多需要中间表,示例:

# ====================== 2.4 多对多使用示例(优化版) ======================
def many_to_many():
    """测试多对多关系"""
    db: Session = get_db_session()
    try:
        # 1. 新增角色
        admin_role = Role(name="admin")
        guest_role = Role(name="guest")
        db.add_all([admin_role, guest_role])
        db.commit()

        # 2. 给用户分配角色
        from sqlalchemy import select
        stmt = select(User).where(User.username == "tianqi")
        user = db.scalars(stmt).first()
        if user:
            user.roles.append(admin_role)
            user.roles.append(guest_role)
            db.commit()

            # 3. 查询用户的角色
            print(f"\n{user.username}的角色:", [role.name for role in user.roles]) # 输出: tianqi的角色: ['admin', 'guest']

            # 4. 查询角色下的用户
            stmt = select(Role).where(Role.name == "admin")
            role = db.scalars(stmt).first()
            if role:
                print(f"{role.name}角色下的用户:", [u.username for u in role.users])  # 输出: admin角色下的用户: ['tianqi']

    except Exception as e:
        db.rollback()
        raise RuntimeError(f"多对多测试失败:{str(e)}") from e
    finally:
        db.close()


# ====================== 执行所有测试 ======================
if __name__ == "__main__":
    # 1. 初始化表
    init_relationship_tables()
    # 2. 测试一对多
    one_to_many()
    # 3. 测试一对一
    one_to_one()
    # 4. 测试多对多
    many_to_many()

执行后,查询结果:

在这里插入图片描述

3.执行原生 SQL

虽然 ORM 很方便,但有时需要执行原生 SQL(如复杂查询、存储过程):

from sqlite3 import Row
from typing import List, Tuple, Optional, Sequence

from sqlalchemy import Integer, String, text
from sqlalchemy.engine import Result  # 2.0+ 结果类型注解
from sqlalchemy.orm import Session, DeclarativeBase, Mapped, mapped_column

# 定义数据库引擎/会话
from sync_crud import get_db_session
from sync_session import sync_engine


# ====================== 基础配置 ======================
class Base(DeclarativeBase):
    __abstract__ = True


# ====================== User 模型优化 ======================
class User(Base):
    __tablename__ = "users"

    # 2.0+ 新写法:Mapped + mapped_column(类型提示更精准)
    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, index=True, comment="用户ID")
    username: Mapped[str] = mapped_column(String(50), unique=True, nullable=False, index=True, comment="用户名")
    email: Mapped[Optional[str]] = mapped_column(String(100), unique=True, index=True, comment="邮箱")
    age: Mapped[int] = mapped_column(Integer, default=0, nullable=False, comment="年龄")
    balance: Mapped[int] = mapped_column(Integer, default=0, nullable=False, comment="余额")

    def __repr__(self) -> str:
        return f"<User(id={self.id}, username='{self.username}', email='{self.email}', age={self.age}, balance={self.balance})>"


# ====================== 工具函数:初始化表和测试数据 ======================
def init_tables():
    """初始化关系表(删除旧表+创建新表)"""
    db: Session = get_db_session()
    try:
        # 先删除旧表(按依赖顺序)
        Base.metadata.drop_all(bind=sync_engine)
        # 创建新表
        Base.metadata.create_all(bind=sync_engine)
        print("关系表初始化完成")
    finally:
        db.close()


# ====================== 核心:原生SQL操作(2.0+ 优化版) ======================
def execute_sql() -> None:
    """执行原生SQL操作"""
    db: Session = get_db_session()  # 获取会话(封装为可复用函数)
    try:
        # 1. 单条插入数据(获取插入后的自增ID)
        single_insert_sql = text("""
                    INSERT INTO users (username, email, age, balance) 
                    VALUES (:username, :email, :age, :balance)
                """)
        single_insert_data = {"username": "wushi", "email": "wushi@example.com", "age": 30, "balance": 100}
        # MySQL 下获取自增ID(不同数据库语法不同)
        if sync_engine.dialect.name == "mysql":
            db.execute(single_insert_sql, single_insert_data)
            last_id = db.execute(text("SELECT LAST_INSERT_ID()")).scalar()
            db.commit()
            print(f"单条插入完成,自增ID:{last_id}")

        # 2. 批量插入数据(2.0+ 优化:executemany 模式,性能更高)
        batch_insert_sql = text("""
            INSERT INTO users (username, email, age, balance) 
            VALUES (:username, :email, :age, :balance)
        """)
        batch_data: List[dict] = [
            {"username": "zhangsan", "email": "zhangsan@example.com", "age": 20, "balance": 100},
            {"username": "lisi", "email": "lisi@example.com", "age": 22, "balance": 100},
            {"username": "wangwu", "email": "wangwu@example.com", "age": 25, "balance": 100}
        ]
        # 2.0+ 推荐使用 executemany_mode="values" 优化批量插入
        result: Result = db.execute(
            batch_insert_sql,
            batch_data,
            execution_options={"executemany_mode": "values"}  # 批量插入优化
        )
        db.commit()
        # 批量创建数据完成,影响行数:3
        print(f"批量创建数据完成,影响行数:{result.rowcount}")  # type: ignore[attr-defined]

        # 3. 执行查询
        query_sql = text("SELECT username, age FROM users WHERE age > :age")
        query_result: Result = db.execute(query_sql, {"age": 20})
        all_results: Sequence[Row] = query_result.fetchall()
        print("原生 SQL 查询结果:", all_results)  # 输出: [('lisi', 22), ('wangwu', 25)]

        # 4. 执行单条查询(返回单个元组,2.0+ 空值处理优化)
        single_sql = text("SELECT username FROM users WHERE id = :id")
        single_result: Optional[Tuple[str]] = db.execute(single_sql, {"id": 1}).fetchone()
        if single_result:
            print("单个结果:", single_result[0])  # 输出: zhangsan
        else:
            print("未找到 ID 为 1 的用户")

        # 5. 执行更新操作
        update_sql = text("UPDATE users SET age = :age WHERE username = :username")
        update_result: Result = db.execute(update_sql, {"age": 23, "username": "zhangsan"})
        db.commit()
        # 使用rowcount属性
        update_row_count = update_result.rowcount  # type: ignore[attr-defined]
        print(f"原生 SQL 修改完成,影响行数:{update_row_count}")  # 输出: 1

    except Exception as e:
        db.rollback()  # 异常回滚,保证数据一致性
        raise RuntimeError(f"原生SQL操作失败:{str(e)}") from e  # 保留异常栈
    finally:
        db.close()  # 确保会话关闭(2.0+ 同步会话必须显式关闭)


# ====================== 执行入口 ======================
if __name__ == "__main__":
    # 1. 初始化表
    init_tables()
    # 2. 执行原生SQL操作
    execute_sql()

4.事务处理

事务保证多个操作要么全部成功,要么全部失败,SQLAlchemy 会话默认开启事务:

from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError

# ====================== 核心:转账事务(2.0+ 优化版) ======================
def transfer_money(
        db: Session,
        from_username: str,
        to_username: str,
        amount: int
) -> Optional[tuple[bool, str]]:
    """
    安全的转账事务操作(2.0+ 规范)
    :param db: 数据库会话
    :param from_username: 转出用户名
    :param to_username: 转入用户名
    :param amount: 转账金额(正数)
    :return: (是否成功, 提示信息)
    """
    # 前置校验:金额合法性
    if amount <= 0:
        return False, "转账金额必须大于0"

    # 模拟异常
    if amount == 100:
        raise Exception("模拟异常")

    try:
        # 1. 2.0+ 推荐使用 select 构造器(替代旧 query())
        stmt_a = select(User).where(User.username == from_username)
        stmt_b = select(User).where(User.username == to_username)

        # 2. 悲观锁:锁定行,避免并发修改(关键!防止超卖/余额不一致)
        # for update:2.0+ 兼容,锁定查询到的行直到事务结束
        user_a: Optional[User] = db.scalars(stmt_a.with_for_update()).first()
        user_b: Optional[User] = db.scalars(stmt_b.with_for_update()).first()

        # 3. 业务校验:用户存在性 + 余额充足性
        if not user_a:
            return False, f"转出用户 {from_username} 不存在"
        if not user_b:
            return False, f"转入用户 {to_username} 不存在"
        if user_a.balance < amount:
            return False, f"用户 {from_username} 余额不足(当前:{user_a.balance},需转出:{amount})"

        # 4. 原子性更新操作
        user_a.balance -= amount
        user_b.balance += amount

        # 5. 可选:批量更新(替代对象修改,性能更高)
        # 适用于高并发场景,直接执行UPDATE语句,减少ORM对象操作
        # stmt_update_a = update(User).where(User.username == from_username).values(balance=User.balance - amount)
        # stmt_update_b = update(User).where(User.username == to_username).values(balance=User.balance + amount)
        # db.execute(stmt_update_a)
        # db.execute(stmt_update_b)

        # 6. 提交事务(2.0+ 同步提交,无需await)
        db.commit()

        # 7. 刷新对象,获取最新数据(可选)
        db.refresh(user_a)
        db.refresh(user_b)

        return True, (
            f"转账成功!\n"
            f"{from_username} 余额:{user_a.balance}(原:{user_a.balance + amount})\n"
            f"{to_username} 余额:{user_b.balance}(原:{user_b.balance - amount})"
        )

    except SQLAlchemyError as e:
        # 2.0+ 专用异常捕获:仅捕获数据库相关异常,避免捕获所有异常
        db.rollback()
        return False, f"转账失败(数据库异常):{str(e)}"
    except Exception as e:
        # 其他业务异常
        db.rollback()
        return False, f"转账失败(业务异常):{str(e)}"
    finally:
        # 可选:关闭会话(若会话是函数内创建,否则由调用方管理)
        # db.close()
        pass

# ====================== 执行入口 ======================
if __name__ == "__main__":
    """执行原生SQL操作"""
    db: Session = get_db_session()  # 获取会话(封装为可复用函数)

    # 测试1:正常转账
    success, msg = transfer_money(db, "zhangsan", "lisi", 10)
    print("\n测试1 - 正常转账:", success, msg)
    # 输出:
    # 测试1 - 正常转账: True 转账成功!
    # zhangsan 余额:90(原:100)
    # lisi :110(原:100)

    # 测试2:余额不足(触发回滚)
    success, msg = transfer_money(db, "zhangsan", "lisi", 1000)
    print("\n测试2 - 余额不足:", success, msg)
    # 输出: 测试2 - 余额不足: False 用户 zhangsan 余额不足(当前:90,需转出:1000)

    # 测试3:用户不存在(触发回滚)
    success, msg = transfer_money(db, "alice", "lisi", 10)
    print("\n测试3 - 用户不存在:", success, msg)
    # 输出: 测试3 - 用户不存在: False 转出用户 alice 不存在

    # 测试4:手动触发异常(模拟业务报错)
    # 可取消注释测试:在transfer_money中添加 raise Exception("模拟异常")
    # success, msg = transfer_money(db, "zhangsan", "lisi", 100)
    # print("\n测试4 - 手动异常:", success, msg)

事务特性(ACID)

  • 原子性(Atomicity):操作不可分割;
  • 一致性(Consistency):数据状态一致;
  • 隔离性(Isolation):多个事务互不干扰;
  • 持久性(Durability):提交后数据永久保存。

在这里插入图片描述

5.优化查询

5.1.延迟加载 vs 立即加载

  • 延迟加载(默认):访问关联对象时才查询数据库(N+1 查询问题);
  • 立即加载(joinedload):通过 JOIN 一次性查询所有数据。
# ====================== 5.1 加载策略优化(解决N+1查询) ======================
def optimize_loading_strategy(db: Session):
    """2.0+ 加载策略优化(joinedload/selectinload)"""
    print("=== 5.1 加载策略优化 ===")
    
    # 问题:N+1查询(2.0+ 仍存在,需显式优化)
    print("\n【N+1查询示例】")
    users: List[User] = db.scalars(select(User)).all()
    for user in users[:2]:  # 仅演示前2个用户
        # 访问articles时触发新查询(N+1问题)
        print(f"用户 {user.username} 的文章数:{len(user.articles)}")

    # 优化1:joinedload(左连接,一次性加载,适合一对一/一对多)
    print("\n【joinedload优化(左连接)】")
    stmt = select(User).options(joinedload(User.articles))
    users = db.scalars(stmt).all()
    for user in users[:2]:
        # 无额外查询,直接访问关联数据
        print(f"用户 {user.username} 的文章:{[art.title for art in user.articles[:1]]}")

    # 优化2:selectinload(IN查询,适合多对多/大数据量一对多)
    print("\n【selectinload优化(IN查询)】")
    stmt = select(User).options(selectinload(User.roles))
    users = db.scalars(stmt).all()
    for user in users[:2]:
        print(f"用户 {user.username} 的角色:{[role.name for role in user.roles]}")

    # 扩展:2.0+ 高级加载策略
    # 1. lazyload:强制延迟加载(覆盖模型默认)
    stmt = select(User).options(lazyload(User.articles))
    # 2. contains_eager:配合手动JOIN,复用已有连接
    stmt = select(User).join(User.articles).options(contains_eager(User.articles))

5.2.只查询指定字段

避免查询所有字段,提升性能:

# ====================== 5.2 只查询指定字段 ======================
def optimize_selected_fields(db: Session):
    """字段筛选优化(减少数据传输)"""
    
    # 基础写法:只查询指定字段(返回元组)
    print("\n【基础字段筛选】")
    stmt = select(User.username, User.email)
    result: Result = db.execute(stmt)
    user_fields: List[Tuple[str, str]] = result.all()
    for username, email in user_fields[:3]:
        print(f"用户名:{username},邮箱:{email}")

    # 进阶:映射为字典(更易用)
    print("\n【字段筛选+映射字典】")
    stmt = select(
        User.username.label("name"),
        User.age.label("user_age")
    ).where(User.age > 18)
    result = db.execute(stmt)
    # 转为字典列表(2.0+ Result对象支持mappings())
    user_dicts = [dict(row) for row in result.mappings()]
    for user in user_dicts[:3]:
        print(f"姓名:{user['name']},年龄:{user['user_age']}")

    # 性能优化:count查询仅查主键(避免count(*))
    print("\n【count优化】")
    total = db.scalar(select(func.count(User.id)))  # 比count(*)更快
    print(f"总用户数:{total}")

5.3.使用索引

在频繁查询的字段上创建索引:

# User模型
class User(Base):
    __tablename__ = "users"
    
    # 基础字段 + 单字段索引
    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, index=True)
    username: Mapped[str] = mapped_column(
        String(50), unique=True, nullable=False, index=True, comment="用户名(索引)"
    )
    email: Mapped[str] = mapped_column(
        String(100), unique=True, nullable=False, index=True, comment="邮箱(唯一索引)"
    )
    age: Mapped[int] = mapped_column(Integer, default=0, comment="年龄")
    is_active: Mapped[bool] = mapped_column(Boolean, default=True, comment="是否激活")
    balance: Mapped[int] = mapped_column(Integer, default=0, comment="余额")

    # 关联关系(默认lazy="select",即延迟加载)
    articles: Mapped[List[Article]] = relationship("Article", back_populates="author")
    roles: Mapped[List[Role]] = relationship("Role", secondary=user_role, back_populates="users")

    # 2.0+ 复合索引(生产级优化:覆盖高频查询条件)
    __table_args__ = (
        # 复合索引:age + is_active(匹配 WHERE age > ? AND is_active = ?)
        Index("idx_age_active", "age", "is_active"),
        # 唯一索引(避免重复)
        Index("idx_username_unique", "username", unique=True),
        # 部分索引(仅对激活用户生效,SQLite不支持,MySQL/PG支持)
        # Index("idx_active_email", "email", postgresql_where=(is_active == True)),
    )
    
# ====================== 5.3 索引优化(2.0+ 规范) ======================
def index_optimization_demo():
    """索引优化说明(模型定义规范)"""
    print("\n=== 5.3 索引优化 ===")
    print("✅ 单字段索引:username/email/index=True(高频查询字段)")
    print("✅ 复合索引:idx_age_active(匹配多字段查询条件)")
    print("✅ 唯一索引:username/email(避免重复数据)")
    print("✅ 部分索引:仅对激活用户生效(MySQL/PG支持,SQLite不支持)")
    print("❌ 避免过度索引:更新频繁的字段(如balance)不建索引")
   

5.4.批量操作

避免循环单条操作,使用批量增删改:

# ====================== 5.4 批量操作(2.0+ 性能优化) ======================
def batch_operations_optimization(db: Session):
    """批量操作优化(减少数据库交互)"""
    
    # 1. 批量插入
    print("\n【批量插入】")
    batch_users = [
        User(username=f"user{i}", email=f"user{i}@example.com", age=18+i)
        for i in range(1, 4)
    ]
    db.add_all(batch_users)
    db.commit()
    print(f"批量插入 {len(batch_users)} 个用户完成")

    # 2. 批量更新
    print("\n【批量更新】")
    stmt = update(User).where(User.age < 20).values(is_active=False)
    result = db.execute(stmt)
    db.commit()
    print(f"批量更新 {result.rowcount} 条记录(年龄<20的用户设为非激活)")

    # 3. 批量删除
    print("\n【批量删除】")
    stmt = delete(User).where(User.is_active == False)
    result = db.execute(stmt)
    db.commit()
    print(f"批量删除 {result.rowcount} 条记录(非激活用户)")

    # 批量插入优化(executemany_mode)
    # 适用于MySQL,转为 VALUES (...), (...) 格式
    # db.execute(
    #     insert(User),
    #     [{"username": "u1", "email": "u1@com"}, ...],
    #     execution_options={"executemany_mode": "values"}
    # )

五、实战案例

1.博客系统

1.1.需求分析

实现一个简易博客系统,包含以下功能:

  • 用户管理(注册、查询、修改、删除);
  • 文章管理(发布、查询、修改、删除);
  • 评论管理(新增、查询、删除);
  • 角色权限(管理员 / 普通用户)。

项目结构:

D:\Workspaces\python\test\Test01\blog
│   blog.db
│   core.py
│   crud.py
│   __init__.py

1.2.数据模型设计

项目目录下,创建 core.py 文件:

import datetime
from typing import List, Optional, Generator

# SQLAlchemy 2.0 核心导入
from sqlalchemy import (
    Boolean,
    DateTime,
    ForeignKey,
    Integer,
    String,
    create_engine,
)
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import (
    DeclarativeBase,
    Mapped,
    mapped_column,
    relationship,
    sessionmaker,
    Session,
)


# --------------------------- 基础配置 ---------------------------
class Base(DeclarativeBase):
    """所有模型的基类"""
    # 为所有模型添加通用的创建时间字段
    __abstract__ = True  # 抽象基类,不会创建表

    id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
    create_time: Mapped[datetime.datetime] = mapped_column(
        DateTime, default=lambda: datetime.datetime.now(datetime.UTC), comment="创建时间"
    )


# 数据库连接配置
DATABASE_URL = "sqlite:///./blog.db"
engine = create_engine(
    DATABASE_URL,
    # check_same_thread=False 是 SQLite 专属配置:
    # 允许同一个数据库连接被多个线程共享使用,适配 SQLAlchemy 连接池的多线程场景;
    # 解除「连接必须和创建它的线程绑定」的限制,避免 ProgrammingError 报错;
    connect_args={
        "check_same_thread": False, # 关闭 SQLite 的线程检查机制
        "uri": True  # 启用URI模式
    },
    # 或使用文件锁保证写安全
    execution_options={"isolation_level": "SERIALIZABLE"},
    echo=True,  # 设为 True 可打印 SQL 语句,便于调试
)

# 创建会话工厂
SessionLocal = sessionmaker(
    autocommit=False,
    autoflush=False,
    bind=engine,
    expire_on_commit=False,  # 提交后不立即过期对象
)


# --------------------------- 数据模型 ---------------------------
class Role(Base):
    """角色模型"""
    __tablename__ = "roles"

    name: Mapped[str] = mapped_column(String(50), unique=True, index=True, comment="角色名称")
    description: Mapped[Optional[str]] = mapped_column(String(200), nullable=True, comment="角色描述")

    # 多对多关联用户
    users: Mapped[List["User"]] = relationship(
        "User", secondary="user_roles", back_populates="roles"
    )

    def __repr__(self) -> str:
        return f"<Role(id={self.id}, name={self.name})>"


class UserRole(Base):
    """用户-角色 多对多中间表"""
    __tablename__ = "user_roles"
    __table_args__ = {"comment": "用户角色关联表"}  # 表注释

    # 复合主键
    user_id: Mapped[int] = mapped_column(
        ForeignKey("users.id"), primary_key=True, comment="用户ID"
    )
    role_id: Mapped[int] = mapped_column(
        ForeignKey("roles.id"), primary_key=True, comment="角色ID"
    )


class User(Base):
    """用户模型"""
    __tablename__ = "users"
    __table_args__ = {"comment": "用户表"}

    username: Mapped[str] = mapped_column(String(50), unique=True, index=True, comment="用户名")
    email: Mapped[str] = mapped_column(String(100), unique=True, index=True, comment="邮箱")
    password: Mapped[str] = mapped_column(String(100), comment="密码(建议加密存储)")
    is_active: Mapped[bool] = mapped_column(
        Boolean, default=True, comment="是否激活"
    )
    age: Mapped[Optional[int]] = mapped_column(Integer, nullable=True, comment="年龄")

    # 关联角色(多对多)
    roles: Mapped[List[Role]] = relationship(
        "Role", secondary="user_roles", back_populates="users"
    )
    # 关联文章(一对多)
    articles: Mapped[List["Article"]] = relationship(
        "Article", back_populates="author", cascade="all, delete-orphan"
    )
    # 关联评论(一对多)
    comments: Mapped[List["Comment"]] = relationship(
        "Comment", back_populates="author", cascade="all, delete-orphan"
    )

    def __repr__(self) -> str:
        return f"<User(id={self.id}, username={self.username}, email={self.email})>"


class Article(Base):
    """文章模型"""
    __tablename__ = "articles"
    __table_args__ = {"comment": "文章表"}

    title: Mapped[str] = mapped_column(String(100), index=True, comment="文章标题")
    content: Mapped[str] = mapped_column(String, comment="文章内容")
    read_count: Mapped[int] = mapped_column(
        Integer, default=0, comment="阅读量"
    )
    user_id: Mapped[int] = mapped_column(
        ForeignKey("users.id"), comment="作者ID"
    )

    # 关联作者
    author: Mapped["User"] = relationship(
        "User", back_populates="articles"
    )
    # 关联评论(一对多,删除文章时级联删除评论)
    comments: Mapped[List["Comment"]] = relationship(
        "Comment", back_populates="article", cascade="all, delete-orphan"
    )

    def __repr__(self) -> str:
        return f"<Article(id={self.id}, title={self.title[:20]}...)>"


class Comment(Base):
    """评论模型"""
    __tablename__ = "comments"
    __table_args__ = {"comment": "评论表"}

    content: Mapped[str] = mapped_column(String(500), comment="评论内容")
    user_id: Mapped[int] = mapped_column(
        ForeignKey("users.id"), comment="评论用户ID"
    )
    article_id: Mapped[int] = mapped_column(
        ForeignKey("articles.id"), comment="关联文章ID"
    )

    # 关联用户
    author: Mapped["User"] = relationship(
        "User", back_populates="comments"
    )
    # 关联文章
    article: Mapped["Article"] = relationship(
        "Article", back_populates="comments"
    )

    def __repr__(self) -> str:
        return f"<Comment(id={self.id}, content={self.content[:20]}...)>"


# --------------------------- 数据库操作工具函数 ---------------------------
def create_tables() -> None:
    """创建所有表"""
    try:
        Base.metadata.create_all(bind=engine)
        print("数据库表创建成功!")
    except SQLAlchemyError as ex:
        print(f"创建表失败:{ex}")
        raise


def get_db() -> Generator[Session, None, None]:
    """
    获取数据库会话(依赖注入风格)
    2.0 版本推荐使用 Session 类型提示
    """
    db_session: Session = SessionLocal()
    try:
        yield db_session
    except SQLAlchemyError as ex:
        db_session.rollback()  # 异常时回滚
        print(f"数据库操作异常:{ex}")
        raise
    finally:
        db_session.close()


# --------------------------- 测试代码 ---------------------------
if __name__ == "__main__":
    # 创建所有表
    create_tables()

    # 测试代码
    # try:
    #     # with语句自动管理会话,结束后自动关闭
    #     with SessionLocal() as db:
    #         print("数据库会话创建成功!")
    #
    #         admin_role = db.query(Role).filter(Role.name == "admin").first()
    #         if not admin_role:
    #             admin_role = Role(name="admin", description="管理员")
    #             db.add(admin_role)
    #             db.commit()
    #             db.refresh(admin_role)
    #             print(f"添加测试角色成功:{admin_role}")
    #         else:
    #             print(f"admin角色已存在:{admin_role}")
    # except Exception as e:
    #     print(f"测试过程出错:{e}")

执行上述代码,会自动创建 blog.db 数据库文件,查询生成的表结构:

在这里插入图片描述

1.3.初始化数据(SQL 脚本)

-- 插入角色
INSERT INTO roles (name, description) VALUES 
('admin', '系统管理员,拥有所有权限'),
('user', '普通用户,只能操作自己的内容');

-- 插入管理员用户(密码:123456,实际项目需加密)
INSERT INTO users (username, email, password, is_active) VALUES 
('admin', 'admin@example.com', '123456', 1);

-- 给管理员分配角色
INSERT INTO user_role (user_id, role_id) VALUES (1, 1);

执行后,查询生成的表数据:

在这里插入图片描述

1.4.核心功能实现

项目目录下,创建 crud.py 文件:

import os
import sys
from dataclasses import dataclass
from typing import Dict, Optional, Any, Sequence

# 添加项目根目录到 Python 路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

# SQLAlchemy 2.0 核心导入
from sqlalchemy import (
    select,
    func,
    delete,
    update,
    or_
)
from sqlalchemy.engine import Result
from sqlalchemy.exc import (
    SQLAlchemyError,
    IntegrityError
)
from sqlalchemy.orm import (
    Session,
    joinedload,
    selectinload
)

# 导入模型和数据库会话
from blog.core import User, Role, Article, Comment, get_db


# --------------------------- 数据结构定义 ---------------------------
@dataclass
class PaginationResult:
    """分页结果数据结构"""
    items: Sequence[Any]
    total: int
    page: int
    page_size: int
    total_pages: int

    def to_dict(self) -> Dict[str, Any]:
        """转换为字典格式"""
        return {
            "items": [item.__dict__ for item in self.items],  # 可替换为自定义序列化逻辑
            "total": self.total,
            "page": self.page,
            "page_size": self.page_size,
            "total_pages": self.total_pages
        }


# --------------------------- 异常定义 ---------------------------
class BusinessException(Exception):
    """业务异常基类"""

    def __init__(self, message: str, code: int = 400):
        self.message = message
        self.code = code
        super().__init__(self.message)


class ResourceExistsException(BusinessException):
    """资源已存在异常"""

    def __init__(self, message: str):
        super().__init__(message, 409)


class ResourceNotFoundException(BusinessException):
    """资源不存在异常"""

    def __init__(self, message: str):
        super().__init__(message, 404)


class PermissionDeniedException(BusinessException):
    """权限不足异常"""

    def __init__(self, message: str):
        super().__init__(message, 403)


# --------------------------- 通用工具函数 ---------------------------
def commit_with_retry(db: Session, max_retries: int = 3) -> None:
    """
    提交事务并支持重试(处理并发冲突)
    """
    retries = 0
    while retries < max_retries:
        try:
            db.commit()
            return
        except SQLAlchemyError as ex:
            retries += 1
            db.rollback()
            if retries >= max_retries:
                raise BusinessException(f"数据库操作失败:{str(ex)}")


def get_password_hash(password: str) -> str:
    """
    密码加密(生产环境必需)
    """
    return password


# --------------------------- 用户管理 ---------------------------
def create_user(
        db: Session,
        username: str,
        email: str,
        password: str,
        age: Optional[int] = None
) -> User:
    """
    创建用户(SQLAlchemy 2.0 优化版)

    Args:
        db: 数据库会话
        username: 用户名
        email: 邮箱
        password: 原始密码(自动加密)
        age: 年龄

    Returns:
        创建的用户对象

    Raises:
        ResourceExistsException: 用户名/邮箱已存在
        BusinessException: 数据库操作失败
    """
    # 2.0 优化:一次查询检查用户名和邮箱,减少数据库交互
    stmt = select(User).where(
        or_(User.username == username, User.email == email)
    )
    existing_user: Optional[User] = db.execute(stmt).scalar_one_or_none()

    if existing_user:
        if existing_user.username == username:
            raise ResourceExistsException("用户名已存在")
        else:
            raise ResourceExistsException("邮箱已存在")

    # 密码加密(生产环境必需)
    hashed_password = get_password_hash(password)

    # 创建用户
    new_user = User(
        username=username,
        email=email,
        password=hashed_password,
        is_active=True,
        age=age
    )

    # 分配普通用户角色(优化:使用 selectinload 减少N+1查询)
    user_role: Optional[Role] = db.execute(
        select(Role).where(Role.name == "user")
    ).scalar_one_or_none()

    if user_role:
        new_user.roles.append(user_role)

    try:
        db.add(new_user)
        commit_with_retry(db)
        db.refresh(new_user)
        # 刷新后加载角色信息
        db.execute(select(User).options(joinedload(User.roles)).where(User.id == new_user.id))
        return new_user
    except SQLAlchemyError as ex:
        db.rollback()
        raise BusinessException(f"创建用户失败:{str(ex)}")


def get_user_by_username(db: Session, username: str) -> Optional[User]:
    """
    根据用户名查询用户(包含角色)

    Args:
        db: 数据库会话
        username: 用户名

    Returns:
        用户对象或None
    """
    stmt = select(User).options(
        selectinload(User.roles)  # 2.0 推荐:selectinload 性能优于 joinedload 多对多
    ).where(User.username == username)

    return db.execute(stmt).scalar_one_or_none()


def get_user_by_id(db: Session, user_id: int) -> Optional[User]:
    """
    根据ID查询用户(包含角色)

    Args:
        db: 数据库会话
        user_id: 用户ID

    Returns:
        用户对象或None
    """
    stmt = select(User).options(
        selectinload(User.roles)
    ).where(User.id == user_id)

    return db.execute(stmt).scalar_one_or_none()


def update_user(
        db: Session,
        user_id: int,
        **kwargs
) -> User:
    """
    修改用户信息

    Args:
        db: 数据库会话
        user_id: 用户ID
        **kwargs: 要更新的字段(如 email, age, is_active 等)

    Returns:
        更新后的用户对象

    Raises:
        ResourceNotFoundException: 用户不存在
        BusinessException: 更新失败
    """
    # 检查用户是否存在
    db_user = get_user_by_id(db, user_id)
    if not db_user:
        raise ResourceNotFoundException("用户不存在")

    # 过滤合法字段,防止更新敏感字段
    allowed_fields = {"username", "email", "age", "is_active"}
    update_fields = {k: v for k, v in kwargs.items() if k in allowed_fields}

    if not update_fields:
        return db_user

    # 2.0 优化:批量更新字段
    try:
        for key, value in update_fields.items():
            setattr(db_user, key, value)

        commit_with_retry(db)
        db.refresh(db_user)
        return db_user
    except IntegrityError as ex:
        db.rollback()
        raise ResourceExistsException(f"更新失败:{str(ex)}")
    except SQLAlchemyError as ex:
        db.rollback()
        raise BusinessException(f"更新用户失败:{str(ex)}")


def delete_user(db: Session, user_id: int) -> bool:
    """
    删除用户(2.0 批量删除写法,无需先查询)

    Args:
        db: 数据库会话
        user_id: 用户ID

    Returns:
        删除成功返回True

    Raises:
        ResourceNotFoundException: 用户不存在
    """
    stmt = delete(User).where(User.id == user_id)
    result: Result = db.execute(stmt)

    if result.rowcount == 0:  # type: ignore[attr-defined]
        raise ResourceNotFoundException("用户不存在")

    commit_with_retry(db)
    return True


# --------------------------- 文章管理 ---------------------------
def create_article(
        db: Session,
        title: str,
        content: str,
        user_id: int
) -> Article:
    """
    发布文章

    Args:
        db: 数据库会话
        title: 文章标题
        content: 文章内容
        user_id: 作者ID

    Returns:
        文章对象

    Raises:
        ResourceNotFoundException: 用户不存在
        BusinessException: 创建失败
    """
    # 检查用户是否存在
    if not db.execute(select(User.id).where(User.id == user_id)).scalar():
        raise ResourceNotFoundException("用户不存在")

    new_article = Article(
        title=title,
        content=content,
        user_id=user_id
    )

    try:
        db.add(new_article)
        commit_with_retry(db)
        db.refresh(new_article)
        return new_article
    except SQLAlchemyError as ex:
        db.rollback()
        raise BusinessException(f"发布文章失败:{str(ex)}")


def get_article_list(
        db: Session,
        page: int = 1,
        page_size: int = 10
) -> PaginationResult:
    """
    分页查询文章列表(包含作者)

    Args:
        db: 数据库会话
        page: 页码(默认1)
        page_size: 每页条数(默认10)

    Returns:
        分页结果对象
    """
    # 参数校验
    page = max(1, page)
    page_size = max(1, min(100, page_size))  # 限制最大页大小
    offset = (page - 1) * page_size

    # 2.0 优化:使用 with_for_update(read=True) 避免脏读(可选)
    # 查询文章列表(优化:只加载需要的字段)
    stmt = select(Article).options(
        joinedload(Article.author).load_only(User.id, User.username, User.email)
    ).order_by(Article.create_time.desc()).offset(offset).limit(page_size)

    articles: Sequence[Article] = db.execute(stmt).scalars().all()

    # 总数量(优化:避免子查询)
    total: int = db.execute(select(func.count(Article.id))).scalar()
    total_pages = (total + page_size - 1) // page_size

    return PaginationResult(
        items=articles,
        total=total,
        page=page,
        page_size=page_size,
        total_pages=total_pages
    )


def get_article_detail(db: Session, article_id: int) -> Article:
    """
    查询文章详情(包含作者、评论及评论作者)

    Args:
        db: 数据库会话
        article_id: 文章ID

    Returns:
        文章对象

    Raises:
        ResourceNotFoundException: 文章不存在
    """
    stmt = select(Article).options(
        joinedload(Article.author).load_only(User.id, User.username),
        joinedload(Article.comments).joinedload(
            Comment.author
        ).load_only(User.id, User.username)
    ).where(Article.id == article_id)

    # 使用 unique() 方法处理连接集合导致的重复数据
    db_article: Optional[Article] = db.execute(stmt).unique().scalar_one_or_none()

    if not db_article:
        raise ResourceNotFoundException("文章不存在")

    # 阅读量+1(2.0 优化:使用 update 语句,无需查询后修改)
    db.execute(
        update(Article)
        .where(Article.id == article_id)
        .values(read_count=Article.read_count + 1)
    )
    commit_with_retry(db)

    # 刷新阅读量
    db.refresh(db_article, attribute_names=["read_count"])

    return db_article


def update_article(
        db: Session,
        article_id: int,
        user_id: int,
        **kwargs
) -> Article:
    """
    修改文章(仅作者可修改)

    Args:
        db: 数据库会话
        article_id: 文章ID
        user_id: 操作人ID
        **kwargs: 要更新的字段(title, content)

    Returns:
        更新后的文章对象

    Raises:
        ResourceNotFoundException: 文章/用户不存在
        PermissionDeniedException: 无权限
    """
    # 检查文章是否存在并验证权限
    stmt = select(Article).where(Article.id == article_id)
    db_article: Optional[Article] = db.execute(stmt).scalar_one_or_none()

    if not db_article:
        raise ResourceNotFoundException("文章不存在")

    if db_article.user_id != user_id:
        raise PermissionDeniedException("无权限修改该文章")

    # 过滤合法字段
    allowed_fields = {"title", "content"}
    update_fields = {k: v for k, v in kwargs.items() if k in allowed_fields}

    if not update_fields:
        return db_article

    try:
        for key, value in update_fields.items():
            setattr(db_article, key, value)

        commit_with_retry(db)
        db.refresh(db_article)
        return db_article
    except SQLAlchemyError as ex:
        db.rollback()
        raise BusinessException(f"修改文章失败:{str(ex)}")


def delete_article(
        db: Session,
        article_id: int,
        user_id: int
) -> bool:
    """
    删除文章(仅作者/管理员可删除)

    Args:
        db: 数据库会话
        article_id: 文章ID
        user_id: 操作人ID

    Returns:
        删除成功返回True

    Raises:
        ResourceNotFoundException: 文章/用户不存在
        PermissionDeniedException: 无权限
    """
    # 检查文章是否存在
    db_article: Optional[Article] = db.execute(
        select(Article).where(Article.id == article_id)
    ).scalar_one_or_none()

    if not db_article:
        raise ResourceNotFoundException("文章不存在")

    # 检查用户和权限
    db_user = get_user_by_id(db, user_id)
    if not db_user:
        raise ResourceNotFoundException("操作用户不存在")

    is_admin = any(role.name == "admin" for role in db_user.roles)
    if db_article.user_id != user_id and not is_admin:
        raise PermissionDeniedException("无权限删除该文章")

    try:
        db.delete(db_article)
        commit_with_retry(db)
        return True
    except SQLAlchemyError as ex:
        db.rollback()
        raise BusinessException(f"删除文章失败:{str(ex)}")


# --------------------------- 评论管理 ---------------------------
def create_comment(
        db: Session,
        content: str,
        user_id: int,
        article_id: int
) -> Comment:
    """
    新增评论

    Args:
        db: 数据库会话
        content: 评论内容
        user_id: 评论者ID
        article_id: 文章ID

    Returns:
        评论对象

    Raises:
        ResourceNotFoundException: 用户/文章不存在
    """
    # 2.0 优化:一次查询检查用户和文章
    user_exists = db.execute(select(User.id).where(User.id == user_id)).scalar()
    article_exists = db.execute(select(Article.id).where(Article.id == article_id)).scalar()

    if not user_exists:
        raise ResourceNotFoundException("用户不存在")
    if not article_exists:
        raise ResourceNotFoundException("文章不存在")

    db_comment = Comment(
        content=content,
        user_id=user_id,
        article_id=article_id
    )

    try:
        db.add(db_comment)
        commit_with_retry(db)
        db.refresh(db_comment)
        return db_comment
    except SQLAlchemyError as ex:
        db.rollback()
        raise BusinessException(f"新增评论失败:{str(ex)}")


def delete_comment(
        db: Session,
        comment_id: int,
        user_id: int
) -> bool:
    """
    删除评论(仅评论者/管理员可删除)

    Args:
        db: 数据库会话
        comment_id: 评论ID
        user_id: 操作人ID

    Returns:
        删除成功返回True

    Raises:
        ResourceNotFoundException: 评论/用户不存在
        PermissionDeniedException: 无权限
    """
    # 检查评论是否存在
    db_comment: Optional[Comment] = db.execute(
        select(Comment).where(Comment.id == comment_id)
    ).scalar_one_or_none()

    if not db_comment:
        raise ResourceNotFoundException("评论不存在")

    # 检查用户和权限
    db_user = get_user_by_id(db, user_id)
    if not db_user:
        raise ResourceNotFoundException("操作用户不存在")

    is_admin = any(role.name == "admin" for role in db_user.roles)
    if db_comment.user_id != user_id and not is_admin:
        raise PermissionDeniedException("无权限删除该评论")

    try:
        db.delete(db_comment)
        commit_with_retry(db)
        return True
    except SQLAlchemyError as ex:
        db.rollback()
        raise BusinessException(f"删除评论失败:{str(ex)}")


# --------------------------- 测试功能 ---------------------------
if __name__ == "__main__":
    # 获取数据库会话
    db_session: Session = next(get_db())

    try:
        # 1. 初始化角色
        roles = db_session.execute(select(Role).where(Role.name.in_(["admin", "guest"]))).scalars().all()
        role_names = {role.name for role in roles}

        missing_roles = []
        if "admin" not in role_names:
            missing_roles.append(Role(name="admin", description="系统管理员"))
        if "guest" not in role_names:
            missing_roles.append(Role(name="guest", description="来宾用户"))

        if missing_roles:
            db_session.add_all(missing_roles)
            commit_with_retry(db_session)
            print("初始化角色成功")

        # 2. 创建测试用户
        try:
            user = create_user(
                db_session,
                username="test_user",
                email="test@example.com",
                password="123456",
                age=23
            )
            print(f"创建用户成功:{user.username} (ID: {user.id})")  # 输出: 创建用户成功:test_user (ID: 1)
        except ResourceExistsException as e:
            print(f"创建用户失败:{e.message}")
            # 获取已存在的用户
            user = get_user_by_username(db_session, "test_user")
            if not user:
                raise

        # 3. 发布测试文章
        article = create_article(
            db_session,
            title="我的第一篇博客",
            content="SQLAlchemy 2.0 实战案例",
            user_id=user.id
        )
        print(f"发布文章成功:{article.title} (ID: {article.id})")  # 输出: 发布文章成功:我的第一篇博客 (ID: 1)

        # 4. 新增评论
        comment = create_comment(
            db_session,
            content="这篇文章写得很好!",
            user_id=user.id,
            article_id=article.id
        )
        print(f"新增评论成功:{comment.content} (ID: {comment.id})")  # 输出: 新增评论成功:这篇文章写得很好! (ID: 1)

        # 5. 查询文章详情
        detail = get_article_detail(db_session, article.id)
        print("\n=== 文章详情 ===")
        print(f"标题:{detail.title}")  # 输出: 标题:我的第一篇博客
        print(f"作者:{detail.author.username}")  # 输出: 作者:test_user
        print(f"阅读量:{detail.read_count}")  # 输出: 阅读量:1
        print(f"评论数:{len(detail.comments)}")  # 输出: 评论数:1
        print(f"第一条评论:{detail.comments[0].content}")  # 输出: 第一条评论:这篇文章写得很好!
        print(f"评论作者:{detail.comments[0].author.username}")  # 输出: 评论作者:test_user

        # 6. 测试分页查询
        pagination = get_article_list(db_session, page=1, page_size=10)
        print(f"\n=== 分页查询 ===")
        print(f"总文章数:{pagination.total}")  # 输出: 总文章数:1
        print(f"当前页:{pagination.page}/{pagination.total_pages}")  # 输出: 当前页:1/1

    except BusinessException as e:
        print(f"业务异常:{e.message} (代码:{e.code})")
    except Exception as e:
        print(f"测试失败:{str(e)}")
        db_session.rollback()
    finally:
        # 关闭会话
        db_session.close()
        print("\n数据库会话已关闭")

执行后,会创建用户、发布文章、新增评论,并查询详情:
在这里插入图片描述

2.电商系统

2.1.案例背景

核心业务模型:

  • 用户 (User):系统用户,可创建订单
  • 商品 (Product):商品信息,包含分类、库存、价格等
  • 订单 (Order):用户创建的订单,包含多个订单项
  • 订单项 (OrderItem):订单中的商品条目(关联订单和商品)
  • 商品分类 (Category):商品分类(一对多)

项目结构:

D:\Workspaces\python\test\Test01\mall
│   core.py
│   crud.py
│   mall.db
│   test.py
│   __init__.py

2.2.完整代码实现

2.2.1.核心模型与数据库配置

项目目录下,创建 core.py 文件:

# -*- coding: utf-8 -*-
"""
电商订单管理系统 - 核心配置与数据模型
========================================
功能:
    1. 定义SQLAlchemy 2.0的基础模型(DeclarativeBase)
    2. 配置数据库连接与会话工厂
    3. 定义业务数据模型(用户、商品、分类、订单、订单项)
    4. 实现通用分页数据结构
依赖:
    - SQLAlchemy 2.0.48
    - Python 3.13.11
创建时间:2026-03-22
作者:inuex
版本:v1.0
"""
import datetime
from dataclasses import dataclass
from typing import List, Optional, Dict, Any, Generator, Sequence

from sqlalchemy import (
    create_engine,
    ForeignKey,
    Integer,
    String,
    Float,
    Boolean,
    DateTime,
    Text,
)
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import (
    DeclarativeBase,
    Session,
    sessionmaker,
    Mapped,
    mapped_column,
    relationship,
)


# --------------------------- 基础配置 ---------------------------
class Base(DeclarativeBase):
    """所有模型的抽象基类"""
    __abstract__ = True

    id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
    create_time: Mapped[datetime.datetime] = mapped_column(
        DateTime, default=datetime.datetime.now(datetime.UTC), comment="创建时间"
    )
    update_time: Mapped[datetime.datetime] = mapped_column(
        DateTime, default=datetime.datetime.now(datetime.UTC), onupdate=datetime.datetime.now(datetime.UTC),
        comment="更新时间"
    )
    is_deleted: Mapped[bool] = mapped_column(
        Boolean, default=False, comment="是否删除(软删除)"
    )


# 数据库连接(SQLite,可替换为MySQL/PostgreSQL)
DATABASE_URL = "sqlite:///./mall.db"
engine = create_engine(
    DATABASE_URL,
    connect_args={"check_same_thread": False},
    echo=False,  # 设为True可打印SQL语句,便于学习
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)


# --------------------------- 数据模型 ---------------------------
class User(Base):
    """用户模型"""
    __tablename__ = "users"

    username: Mapped[str] = mapped_column(String(50), unique=True, index=True, comment="用户名")
    phone: Mapped[str] = mapped_column(String(20), unique=True, index=True, comment="手机号")
    email: Mapped[str] = mapped_column(String(100), nullable=True, comment="邮箱")
    address: Mapped[str] = mapped_column(String(200), nullable=True, comment="收货地址")
    balance: Mapped[float] = mapped_column(Float, default=0.0, comment="账户余额")

    # 关联订单(一对多)
    orders: Mapped[List["Order"]] = relationship(
        "Order", back_populates="user", cascade="all, delete-orphan"
    )

    def __repr__(self) -> str:
        return f"<User(id={self.id}, username={self.username}, balance={self.balance})>"


class Category(Base):
    """商品分类模型"""
    __tablename__ = "categories"

    name: Mapped[str] = mapped_column(String(50), unique=True, comment="分类名称")
    description: Mapped[Optional[str]] = mapped_column(Text, nullable=True, comment="分类描述")

    # 关联商品(一对多)
    products: Mapped[List["Product"]] = relationship(
        "Product", back_populates="category", cascade="all, delete-orphan"
    )

    def __repr__(self) -> str:
        return f"<Category(id={self.id}, name={self.name})>"


class Product(Base):
    """商品模型"""
    __tablename__ = "products"

    name: Mapped[str] = mapped_column(String(100), index=True, comment="商品名称")
    price: Mapped[float] = mapped_column(Float, comment="商品价格")
    stock: Mapped[int] = mapped_column(Integer, default=0, comment="库存数量")
    sales: Mapped[int] = mapped_column(Integer, default=0, comment="销量")
    description: Mapped[Optional[str]] = mapped_column(Text, nullable=True, comment="商品描述")
    category_id: Mapped[int] = mapped_column(ForeignKey("categories.id"), comment="分类ID")

    # 关联分类
    category: Mapped["Category"] = relationship("Category", back_populates="products")
    # 关联订单项
    order_items: Mapped[List["OrderItem"]] = relationship(
        "OrderItem", back_populates="product", cascade="all, delete-orphan"
    )

    def __repr__(self) -> str:
        return f"<Product(id={self.id}, name={self.name}, price={self.price}, stock={self.stock})>"


class Order(Base):
    """订单模型"""
    __tablename__ = "orders"

    order_no: Mapped[str] = mapped_column(String(32), unique=True, index=True, comment="订单编号")
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), comment="用户ID")
    total_amount: Mapped[float] = mapped_column(Float, default=0.0, comment="订单总金额")
    status: Mapped[str] = mapped_column(String(20), default="pending",
                                        comment="订单状态:pending/paid/shipped/completed/cancelled")
    payment_time: Mapped[Optional[datetime.datetime]] = mapped_column(DateTime, nullable=True, comment="支付时间")

    # 关联用户
    user: Mapped["User"] = relationship("User", back_populates="orders")
    # 关联订单项
    items: Mapped[List["OrderItem"]] = relationship(
        "OrderItem", back_populates="order", cascade="all, delete-orphan"
    )

    def __repr__(self) -> str:
        return f"<Order(id={self.id}, order_no={self.order_no}, status={self.status}, total_amount={self.total_amount})>"


class OrderItem(Base):
    """订单项模型(订单-商品 多对多关联)"""
    __tablename__ = "order_items"

    order_id: Mapped[int] = mapped_column(ForeignKey("orders.id"), comment="订单ID")
    product_id: Mapped[int] = mapped_column(ForeignKey("products.id"), comment="商品ID")
    quantity: Mapped[int] = mapped_column(Integer, default=1, comment="购买数量")
    unit_price: Mapped[float] = mapped_column(Float, comment="购买时的单价")

    # 关联订单
    order: Mapped["Order"] = relationship("Order", back_populates="items")
    # 关联商品
    product: Mapped["Product"] = relationship("Product", back_populates="order_items")

    def __repr__(self) -> str:
        return f"<OrderItem(id={self.id}, order_id={self.order_id}, product_id={self.product_id}, quantity={self.quantity})>"


# --------------------------- 分页数据结构 ---------------------------
@dataclass
class Pagination:
    """通用分页返回结构"""
    items: Sequence[Any]
    total: int
    page: int
    page_size: int
    total_pages: int
    has_next: bool
    has_prev: bool

    def to_dict(self) -> Dict[str, Any]:
        """转换为字典格式"""
        return {
            "items": [i.__dict__ for i in self.items],
            "pagination": {
                "total": self.total,
                "page": self.page,
                "page_size": self.page_size,
                "total_pages": self.total_pages,
                "has_next": self.has_next,
                "has_prev": self.has_prev
            }
        }


# --------------------------- 核心业务逻辑 ---------------------------
def create_tables() -> None:
    """创建所有表"""
    Base.metadata.create_all(bind=engine)
    print("数据库表创建成功!")


def get_db() -> Generator[Session, None, None]:
    """获取数据库会话"""
    db_session: Session = SessionLocal()
    try:
        yield db_session
    except SQLAlchemyError as ex:
        db_session.rollback()
        raise ex
    finally:
        db_session.close()

2.2.2.数据操作层(CRUD)

项目目录下,创建 crud.py 文件:

# -*- coding: utf-8 -*-
"""
电商订单管理系统 - 数据操作层(CRUD)
=====================================
功能:
    1. 实现商品、订单、用户等核心业务的增删改查
    2. 封装分页查询、条件过滤、排序等通用查询逻辑
    3. 处理电商核心业务逻辑(订单创建、库存扣减、状态更新)
    4. 实现事务管理与异常处理
依赖:
    - core.py(数据模型与数据库配置)
    - SQLAlchemy 2.0.48
创建时间:2026-03-22
作者:inuex
版本:v1.0
注意:
    1. 所有数据库操作需通过Session会话执行,禁止直接操作数据库连接
    2. 订单创建包含事务逻辑,异常时自动回滚
    3. 分页查询默认最大页大小为50,防止大数据量查询
"""
import datetime
from typing import List, Tuple, Sequence

from sqlalchemy import (
    select,
    func,
    and_,
    desc,
    asc,
)
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import (
    Session,
    joinedload,
    selectinload,
)

from mall.core import Category, Product, User, Pagination, Order, OrderItem


# ====================== 商品管理 ======================
def create_product(
        db: Session,
        name: str,
        price: float,
        category_id: int,
        stock: int = 0,
        description: str = None
) -> Product:
    """创建商品"""
    # 检查分类是否存在
    category = db.execute(select(Category).where(Category.id == category_id)).scalar_one_or_none()
    if not category:
        raise ValueError(f"分类ID {category_id} 不存在")

    db_product = Product(
        name=name,
        price=price,
        stock=stock,
        description=description,
        category_id=category_id
    )

    db.add(db_product)
    db.commit()
    db.refresh(db_product)
    return db_product


def get_product_list(
        db: Session,
        page: int = 1,
        page_size: int = 10,
        category_id: int = None,
        min_price: float = None,
        max_price: float = None,
        keyword: str = None,
        sort_by: str = "create_time",
        sort_order: str = "desc"
) -> Pagination:
    """
    分页查询商品列表(支持多条件过滤、排序)
    :param db: 数据库会话
    :param page: 页码
    :param page_size: 每页条数
    :param category_id: 分类ID(可选)
    :param min_price: 最低价格(可选)
    :param max_price: 最高价格(可选)
    :param keyword: 商品名称关键词(可选)
    :param sort_by: 排序字段(create_time/price/stock/sales)
    :param sort_order: 排序方向(asc/desc)
    :return: 分页结果
    """
    # 参数校验
    page = max(1, page)
    page_size = max(1, min(50, page_size))  # 限制最大页大小
    offset = (page - 1) * page_size

    # 构建查询条件
    conditions = [Product.is_deleted == False]

    if category_id:
        conditions.append(Product.category_id == category_id)
    if min_price is not None:
        conditions.append(Product.price >= min_price)
    if max_price is not None:
        conditions.append(Product.price <= max_price)
    if keyword:
        conditions.append(Product.name.like(f"%{keyword}%"))

    # 构建排序规则
    sort_column = getattr(Product, sort_by, Product.create_time)
    sort_func = desc if sort_order.lower() == "desc" else asc

    # 构建查询
    stmt = select(Product).options(
        joinedload(Product.category).load_only(Category.id, Category.name)  # 关联加载分类(只加载必要字段)
    ).where(and_(*conditions)).order_by(sort_func(sort_column)).offset(offset).limit(page_size)

    # 执行查询
    products: Sequence[Product] = db.execute(stmt).scalars().all()

    # 查询总数(复用条件,不包含分页和排序)
    count_stmt = select(func.count(Product.id)).where(and_(*conditions))
    total = db.execute(count_stmt).scalar()

    # 计算分页信息
    total_pages = (total + page_size - 1) // page_size
    has_next = page < total_pages
    has_prev = page > 1

    return Pagination(
        items=products,
        total=total,
        page=page,
        page_size=page_size,
        total_pages=total_pages,
        has_next=has_next,
        has_prev=has_prev
    )


# ====================== 订单管理 ======================
def generate_order_no() -> str:
    """生成唯一订单编号"""
    return datetime.datetime.now().strftime("%Y%m%d%H%M%S") + str(datetime.datetime.now().microsecond)[:4]


def create_order(
        db: Session,
        user_id: int,
        product_items: List[Tuple[int, int]]  # [(product_id, quantity), ...]
) -> Order:
    """
    创建订单(包含事务管理、库存检查、金额计算)
    :param db: 数据库会话
    :param user_id: 用户ID
    :param product_items: 商品列表 [(商品ID, 数量), ...]
    :return: 创建的订单
    """
    if not product_items:
        raise ValueError("订单中至少包含一个商品")

    # 1. 检查用户是否存在
    user = db.execute(select(User).where(User.id == user_id)).scalar_one_or_none()
    if not user:
        raise ValueError(f"用户ID {user_id} 不存在")

    # 2. 检查商品库存并计算总金额
    total_amount = 0.0
    order_items = []

    for product_id, quantity in product_items:
        if quantity <= 0:
            raise ValueError(f"商品ID {product_id} 购买数量必须大于0")

        # 加锁查询商品(防止超卖)
        product = db.execute(
            select(Product).where(and_(Product.id == product_id, Product.is_deleted == False)).with_for_update()
        ).scalar_one_or_none()

        if not product:
            raise ValueError(f"商品ID {product_id} 不存在或已删除")

        if product.stock < quantity:
            raise ValueError(f"商品 {product.name} 库存不足(当前库存:{product.stock},请求数量:{quantity})")

        # 计算金额
        item_amount = product.price * quantity
        total_amount += item_amount

        # 创建订单项
        order_items.append(OrderItem(
            product_id=product_id,
            quantity=quantity,
            unit_price=product.price
        ))

        # 扣减库存,增加销量
        product.stock -= quantity
        product.sales += quantity

    # 3. 创建订单
    order = Order(
        order_no=generate_order_no(),
        user_id=user_id,
        total_amount=total_amount,
        status="pending",
        items=order_items
    )

    try:
        db.add(order)
        db.commit()
        db.refresh(order)

        # 关联加载订单项和商品信息
        db.execute(
            select(Order).options(
                joinedload(Order.items).joinedload(OrderItem.product),
                joinedload(Order.user)
            ).where(Order.id == order.id)
        )

        return order
    except SQLAlchemyError as ex:
        db.rollback()
        raise ex


def get_order_list(
        db: Session,
        page: int = 1,
        page_size: int = 10,
        user_id: int = None,
        status: str = None,
        start_time: datetime.datetime = None,
        end_time: datetime.datetime = None
) -> Pagination:
    """
    分页查询订单列表(支持多条件过滤)
    """
    page = max(1, page)
    page_size = max(1, min(50, page_size))
    offset = (page - 1) * page_size

    # 构建条件
    conditions = [Order.is_deleted == False]

    if user_id:
        conditions.append(Order.user_id == user_id)
    if status:
        conditions.append(Order.status == status)
    if start_time:
        conditions.append(Order.create_time >= start_time)
    if end_time:
        conditions.append(Order.create_time <= end_time)

    # 构建查询(关联加载用户和订单项)
    stmt = select(Order).options(
        joinedload(Order.user).load_only(User.id, User.username, User.phone),
        selectinload(Order.items).joinedload(OrderItem.product).load_only(Product.id, Product.name, Product.price)
    ).where(and_(*conditions)).order_by(desc(Order.create_time)).offset(offset).limit(page_size)

    orders = db.execute(stmt).scalars().all()

    # 查询总数
    count_stmt = select(func.count(Order.id)).where(and_(*conditions))
    total = db.execute(count_stmt).scalar()

    total_pages = (total + page_size - 1) // page_size
    has_next = page < total_pages
    has_prev = page > 1

    return Pagination(
        items=orders,
        total=total,
        page=page,
        page_size=page_size,
        total_pages=total_pages,
        has_next=has_next,
        has_prev=has_prev
    )


def update_order_status(
        db: Session,
        order_id: int,
        status: str,
        payment_time: datetime.datetime = None
) -> Order:
    """
    更新订单状态
    :param db: 数据库会话
    :param order_id: 订单ID
    :param status: 新状态
    :param payment_time: 支付时间(仅status=paid时需要)
    :return: 更新后的订单
    """
    valid_status = ["pending", "paid", "shipped", "completed", "cancelled"]
    if status not in valid_status:
        raise ValueError(f"无效的订单状态,可选值:{valid_status}")

    db_order = db.execute(select(Order).where(Order.id == order_id)).scalar_one_or_none()
    if not db_order:
        raise ValueError(f"订单ID {order_id} 不存在")

    # 如果是取消订单,恢复库存
    if status == "cancelled" and db_order.status != "cancelled":
        for i in db_order.items:
            db_product = db.execute(select(Product).where(Product.id == i.product_id)).scalar()
            if db_product:
                db_product.stock += i.quantity
                db_product.sales -= i.quantity

    # 更新订单状态
    db_order.status = status
    if status == "paid" and payment_time:
        db_order.payment_time = payment_time

    db.commit()
    db.refresh(db_order)
    return db_order

2.2.3.功能测试脚本

项目目录下,创建 test.py 文件:

# -*- coding: utf-8 -*-
"""
电商订单管理系统 - 功能测试脚本
=================================
功能:
    1. 验证数据库表创建
    2. 测试核心CRUD功能(商品创建、订单生成、状态更新)
    3. 验证分页查询、条件过滤、事务回滚等高级特性
    4. 模拟电商业务流程(用户下单→库存扣减→订单状态更新)
测试场景:
    - 商品分类初始化
    - 商品创建与分页查询
    - 订单创建(含库存检查)
    - 订单状态更新与库存恢复(取消订单)
    - 多条件过滤的商品查询
依赖:
    - core.py + crud.py(核心业务层)
    - SQLAlchemy 2.0.48
创建时间:2026-03-22
作者:inuex
版本:v1.0
运行方式:
    python -m mall.test
"""
import datetime

from sqlalchemy import (
    select,
)
from sqlalchemy.exc import SQLAlchemyError

from mall.core import create_tables, Category, Product, User, get_db
from mall.crud import create_product, create_order, get_product_list, update_order_status, get_order_list


# --------------------------- 测试代码 ---------------------------
def main():
    # 获取数据库会话
    db = next(get_db())

    try:
        # 1. 初始化分类
        electronics = db.execute(select(Category).where(Category.name == "电子产品")).scalar_one_or_none()
        if not electronics:
            electronics = Category(name="电子产品", description="手机、电脑、平板等")
            db.add(electronics)
            db.commit()

        # 2. 创建测试商品
        iphone = db.execute(select(Product).where(Product.name == "iPhone 15")).scalar_one_or_none()
        if not iphone:
            iphone = create_product(
                db,
                name="iPhone 15",
                price=5999.0,
                category_id=electronics.id,
                stock=100,
                description="苹果15手机,128G版本"
            )

        macbook = db.execute(select(Product).where(Product.name == "MacBook Pro")).scalar_one_or_none()
        if not macbook:
            macbook = create_product(
                db,
                name="MacBook Pro",
                price=12999.0,
                category_id=electronics.id,
                stock=50,
                description="苹果笔记本电脑,M3芯片"
            )

        # 3. 创建测试用户
        test_user = db.execute(select(User).where(User.username == "test_buyer")).scalar_one_or_none()
        if not test_user:
            test_user = User(
                username="test_buyer",
                phone="13800138000",
                email="test@example.com",
                address="北京市海淀区",
                balance=20000.0
            )
            db.add(test_user)
            db.commit()

        # 4. 创建订单(购买2个iPhone 15 + 1个MacBook Pro)
        try:
            order = create_order(
                db,
                user_id=test_user.id,
                product_items=[(iphone.id, 2), (macbook.id, 1)]
            )
            print(f"创建订单成功:{order}") # 输出: <Order(id=1, order_no=202603221724427528, status=pending, total_amount=24997.0)>
            print(f"订单总金额:{order.total_amount}") # 输出: 24997.0
            print(f"订单项数量:{len(order.items)}") # 输出: 2

            # 打印订单项详情
            for item in order.items:
                print(f"  - 商品:{item.product.name},数量:{item.quantity},单价:{item.unit_price}")
                # 输出:
                # - 商品:iPhone 15,数量:2,单价:5999.0
                # - 商品:MacBook Pro,数量:1,单价:12999.0

        except ValueError as e:
            print(f"创建订单失败:{e}") # 输出:

        # 5. 分页查询商品(测试多条件过滤)
        print("\n=== 分页查询商品(价格区间 5000-15000,按价格升序)===")
        product_pagination = get_product_list(
            db,
            page=1,
            page_size=5,
            min_price=5000.0,
            max_price=15000.0,
            sort_by="price",
            sort_order="asc"
        )
        print(f"总商品数:{product_pagination.total}") # 输出: 2
        print(f"当前页:{product_pagination.page}/{product_pagination.total_pages}") # 输出: 1/1
        print(f"是否有下一页:{product_pagination.has_next}") # 输出: False
        for product in product_pagination.items:
            print(f"  - {product.name},价格:{product.price},分类:{product.category.name}")
            # 输出:
            # - iPhone 15,价格:5999.0,分类:电子产品
            # - MacBook Pro,价格:12999.0,分类:电子产品

        # 6. 分页查询订单
        print("\n=== 分页查询订单 ===")
        order_pagination = get_order_list(
            db,
            page=1,
            page_size=10,
            user_id=test_user.id
        )
        print(f"总订单数:{order_pagination.total}") # 输出: 1
        for order in order_pagination.items:
            print(f"  - 订单编号:{order.order_no},状态:{order.status},金额:{order.total_amount}")
            # 输出:
            # - 订单编号:202603221724427528,状态:pending,金额:24997.0

        # 7. 更新订单状态为已支付
        if order_pagination.items:
            updated_order = update_order_status(
                db,
                order_id=order_pagination.items[0].id,
                status="paid",
                payment_time=datetime.datetime.now(datetime.UTC)
            )
            print(f"\n更新订单状态成功:{updated_order.order_no} -> {updated_order.status}") # 输出: 202603221724427528 -> paid

    except SQLAlchemyError as e:
        db.rollback()
        print(f"数据库错误:{e}")
    except ValueError as e:
        print(f"业务错误:{e}")
    finally:
        db.close()
        print("\n数据库会话已关闭")


if __name__ == "__main__":
    # 创建表
    create_tables()

    # 运行测试
    main()

2.3.核心知识点解析(深入理解 SQLAlchemy)

2.3.1.复杂关系映射
  • 一对多Category -> ProductUser -> Order,通过 ForeignKey + relationship 实现
  • 多对多(隐式)Order <-> Product 通过中间表 OrderItem 实现(更灵活的多对多,可存储额外字段如购买数量、单价)
  • 级联操作cascade="all, delete-orphan" 实现删除主表数据时自动删除关联子表数据
2.3.2.高级查询技巧
  • 条件构建:使用 and_()or_() 组合多条件,支持动态条件拼接
  • 关联加载优化
    • joinedload():左连接加载关联数据(适合一对一 / 一对多)
    • selectinload():IN 查询加载关联数据(适合多对多 / 一对多)
    • load_only():只加载需要的字段,减少数据传输
  • 行锁with_for_update() 实现悲观锁,防止超卖(电商核心场景)
2.3.3.分页功能进阶
  • 通用分页结构:封装 Pagination 类,包含总数、页码、是否有下一页 / 上一页等信息
  • 条件分页:分页查询支持多条件过滤(价格区间、分类、关键词)+ 自定义排序
  • 参数校验:限制页大小范围,防止恶意请求(如 page_size=10000)
2.3.4.事务管理
  • 完整事务:创建订单时包含「库存检查 → 扣减库存 → 创建订单 → 提交事务」完整流程
  • 异常回滚:任何步骤出错都通过 db.rollback() 回滚事务,保证数据一致性
  • 并发控制:使用行锁防止超卖,解决电商核心并发问题
2.3.5.数据操作最佳实践
  • 批量操作add_all() 批量添加数据,减少数据库交互
  • 更新优化:直接更新字段而非查询后修改(如库存扣减)
  • 软删除is_deleted 字段实现逻辑删除,保留数据记录

2.4.总结

  1. 关系建模:电商场景的多对多关系(订单 - 商品)通过中间表实现,比博客系统的简单关系更复杂,能充分理解 SQLAlchemy 的关系映射;
  2. 查询优化:关联加载、字段过滤、行锁等技巧,解决实际业务中的性能和并发问题;
  3. 分页进阶:支持多条件过滤、自定义排序的分页功能,覆盖实际项目中分页的常见需求;
  4. 事务管理:完整的事务流程 + 异常回滚,体现 SQLAlchemy 在数据一致性方面的能力;
  5. 业务结合:将 SQLAlchemy 操作与电商核心业务(订单创建、库存管理、订单状态更新)结合,理解 ORM 在实际项目中的应用方式。
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐