SQLAlchemy2 从入门到精通,一篇就够(超详细实战教程)
一、介绍
1.什么是 SQLAlchemy?
SQLAlchemy 是 Python 的 SQL 工具包和对象关系映射器,是 Python 中最流行的 ORM(对象关系映射)工具,它为应用程序开发人员提供了 SQL 的全部功能和灵活性。它提供了一整套众所周知的企业级持久化模式,旨在实现高效且高性能的数据库访问,并已将其改编为简洁且符合 Python 风格的领域语言。
官方主站: https://www.sqlalchemy.org/
官方文档: https://docs.sqlalchemy.org/en/stable/
官网下载页:https://www.sqlalchemy.org/download.html
GitHub 源码库:https://github.com/sqlalchemy/sqlalchemy(可提 Issue、看源码、贡献代码)
官方 Discourse 论坛:https://discuss.sqlalchemy.org/(提问、交流问题的官方社区)
2.核心优势
- 异步非阻塞:适配 FastAPI、Starlette 等异步 Web 框架,提升高并发场景性能;
- API 统一:异步 API 与同步 API 逻辑一致,学习成本低;
- 跨数据库兼容:支持 SQLite、MySQL、PostgreSQL、Oracle 等主流数据库,切换数据库只需修改连接字符串。
- 灵活易用:既支持高层的 ORM 操作,也支持底层的原生 SQL 执行。
- 强大的查询能力:提供丰富的查询 API,支持复杂的筛选、聚合、关联查询。
- 事务支持:完善的事务管理机制,保证数据操作的原子性。
3.核心组件
- Engine:数据库连接引擎,负责管理数据库连接池。
- Session:数据库会话,用于执行 CRUD 操作。
- Declarative Base:模型基类,所有数据模型都继承该类。
- Mapper:将 Python 类映射到数据库表。
- Query:查询对象,用于构建数据库查询。
4.相关单词
| 单词 | 音标 | 中文释义 | 核心使用场景 |
|---|---|---|---|
| SQLAlchemy | /ˌeskjuːˈel ˈælkəmi/ | SQL 炼金术(ORM 框架名) | 框架整体引用、技术栈说明 |
| Session | /ˈseʃn/ | 会话 | 数据库连接会话管理(2.0 核心对象) |
| Model | /ˈmɒdl/(英)/ˈmɑːdl/(美) | 模型 | 定义数据库表映射的类 |
| Engine | /ˈendʒɪn/ | 引擎 | 数据库连接引擎创建 |
| Query | /ˈkwɪəri/(英)/ˈkwɪri/(美) | 查询 | 构建数据库查询语句 |
| Column | /ˈkɒləm/(英)/ˈkɑːləm/(美) | 列 | 定义数据库表字段 |
| Table | /ˈteɪbl/ | 表 | 数据库表对象 / 映射 |
| Relationship | /rɪˈleɪʃnʃɪp/ | 关系 | 定义表之间的关联(一对多 / 多对多) |
| ForeignKey | /ˈfɒrən kiː/(英)/ˈfɔːrən kiː/(美) | 外键 | 建立表之间的引用关系 |
| DeclarativeBase | /dɪˈklærətɪv beɪs/ | 声明式基类 | 2.0 模型继承的核心基类 |
| Mapped | /mæpt/ | 映射 | 2.0 字段类型注解(Mapped []) |
| mapped_column | /mæpt ˈkɒləm/(英)/mæpt ˈkɑːləm/(美) | 映射列 | 2.0 定义字段的核心函数 |
| Select | /sɪˈlekt/ | 选择 | 2.0 构建查询的核心语句(select ()) |
| Commit | /kəˈmɪt/ | 提交 | 事务提交操作 |
| Rollback | /ˈrəʊlbæk/(英)/ˈroʊlbæk/(美) | 回滚 | 事务异常时回滚操作 |
| Filter | /ˈfɪltə/(英)/ˈfɪltər/(美) | 过滤 | 查询时添加条件过滤 |
| Join | /dʒɔɪn/ | 连接 | 多表关联查询(join/joinedload) |
| Pagination | /ˌpædʒɪˈneɪʃn/ | 分页 | 数据分页查询处理 |
| Transaction | /trænˈzækʃn/ | 事务 | 数据库事务管理 |
| Metadata | /ˈmetədeɪtə/ | 元数据 | 数据库表结构的元数据对象 |
| Cascade | /kæˈskeɪd/ | 级联 | 定义关联数据的级联操作(如删除) |
| Scalar | /ˈskeɪlə/ | 标量 | 查询单个结果(scalar ()/scalars ()) |
| Sessionmaker | /ˈseʃn ˌmeɪkə/(英)/ˈseʃn ˌmeɪkər/(美) | 会话工厂 | 创建 Session 的工厂函数 |
| AsyncSession | /eɪˈsɪŋk ˈseʃn/ | 异步会话 | 2.0 异步操作的会话对象 |
| Index | /ˈɪndeks/ | 索引 | 为表字段创建索引 |
| Unique | /juˈniːk/ | 唯一 | 定义唯一约束(UniqueConstraint) |
二、环境准备
1.安装 SQLAlchemy
# 安装核心库
pip install sqlalchemy
# 查看已安装的 sqlalchemy 包的详细信息
pip show sqlalchemy
执行 pip show sqlalchemy 后,你会看到类似这样的输出:
Name: SQLAlchemy
Version: 2.0.48
Summary: Database Abstraction Library
Home-page: https://www.sqlalchemy.org
Author: Mike Bayer
Author-email: mike_mp@zzzcomputing.com
License: MIT
Location: D:\Workspaces\python\test\Test01\.venv\Lib\site-packages
Requires: greenlet, typing-extensions
Required-by:
输出字段解释:
| 字段 | 含义 |
|---|---|
| Name | 包的正式名称(注意 SQLAlchemy 是官方名称,小写 sqlalchemy 是安装名) |
| Version | 已安装的版本号(最常用的字段之一,用于确认版本是否匹配需求) |
| Summary | 包的简短描述(说明这个包是做什么的) |
| Home-page | 包的官方网站 / 文档地址 |
| Author/Author-email | 包的作者和联系方式 |
| License | 开源许可证类型(如 MIT) |
| Location | 包在本地的安装路径(定位包的具体位置) |
| Requires | 该包依赖的其他包(如 sqlalchemy 依赖 greenlet) |
| Required-by | 哪些已安装的包依赖这个包(如 flask-sqlalchemy 依赖 sqlalchemy) |
2.安装数据库驱动
不同数据库需要安装对应的驱动,以下是主流数据库的驱动安装命令:
| 数据库 | 同步驱动安装命令 | 异步驱动安装命令 |
|---|---|---|
| SQLite | 无需安装(Python 内置) | 无需安装 |
| MySQL/MariaDB | pip install pymysql 或 mysql-connector-python |
pip install asyncmy |
| PostgreSQL | pip install psycopg2-binary |
pip install asyncpg |
| Oracle | pip install cx-Oracle |
无官方异步驱动(可使用同步 + 线程) |
3.连接字符串格式
连接字符串是 Engine 的核心参数,格式为:数据库类型+驱动://用户名:密码@主机:端口/数据库名?参数
| 数据库 | 同步连接字符串示例 | 异步连接字符串示例 |
|---|---|---|
| SQLite | sqlite:///test.db(相对路径)sqlite:////绝对路径/test.db |
sqlite+aiosqlite:///test.db |
| MySQL | mysql+pymysql://root:123456@localhost:3306/test |
mysql+asyncmy://root:123456@localhost:3306/test |
| PostgreSQL | postgresql+psycopg2://postgres:123456@localhost:5432/test |
postgresql+asyncpg://postgres:123456@localhost:5432/test |
| Oracle | oracle+cx_oracle://scott:tiger@127.0.0.1:1521/orcl |
无(需用同步驱动 + 异步封装) |
说明:
- SQLite 无需用户名 / 密码,
///表示相对路径,////表示绝对路径; - 异步连接字符串需在数据库类型后加
+异步驱动名(如aiosqlite、asyncmy); - 可添加参数(如
charset=utf8mb4、ssl=true)解决编码 / 安全问题。
三、快速入门(以 SQLite 为例)
1.创建数据库连接
1.1.同步连接
在根目录下,创建 sync_session.py 文件:
# 导入核心模块(2.0+ 推荐的导入方式)
from typing import Generator, Optional
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase, Session
from sqlalchemy.pool import QueuePool # 连接池配置
# ====================== 1. 数据库引擎优化(核心性能优化) ======================
# 针对 SQLite 和 SQLAlchemy 2.0+ 的引擎配置优化
# 关键优化点:
# - 关闭 echo(生产环境必须关闭,调试时可开启)
# - 配置连接池(SQLite 虽无真正连接池,但配置可兼容其他数据库)
# - 设置 SQLite 专属优化参数(提升读写性能)
sync_engine = create_engine(
"sqlite:///sync_test.db",
# 基础配置
echo=True, # 生产环境关闭 SQL 打印,提升性能
echo_pool=False, # 关闭连接池日志
# 连接池配置(适配 MySQL/PostgreSQL 等数据库时更重要)
poolclass=QueuePool,
pool_size=5, # 核心连接数
max_overflow=10, # 最大溢出连接数
pool_recycle=3600, # 1小时回收连接,避免失效
pool_pre_ping=True, # 获取连接前检测是否有效,防止死连接
# SQLite 专属优化参数(2.0+ 推荐)
connect_args={
"check_same_thread": False, # 允许多线程访问(SQLite 必要配置)
"timeout": 30, # 数据库锁定超时时间(默认5秒,提升到30秒避免锁死)
},
)
# ====================== 2. 模型基类优化(2.0+ 规范) ======================
# SQLAlchemy 2.0+ 推荐直接继承 DeclarativeBase(替代旧的 declarative_base())
class Base(DeclarativeBase):
"""所有数据模型的基类(2.0+ 规范写法)"""
__abstract__ = True # 标记为抽象类,不会生成数据库表
# ====================== 3. 会话工厂优化(安全性+规范性) ======================
# 2.0+ 推荐显式指定类型,同时保留核心配置
SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=sync_engine,
expire_on_commit=False, # 优化:提交后不自动过期对象,提升查询性能
class_=Session, # 显式指定会话类(2.0+ 规范)
)
# ====================== 4. 会话获取函数优化(类型安全+容错性) ======================
def get_db() -> Generator[Session, None, None]:
"""
获取数据库会话生成器(2.0+ 类型安全版本)
特性:
- 类型注解明确,支持 IDE 提示
- 异常捕获并友好提示
- 确保会话最终关闭
"""
db_session: Optional[Session] = None # 初始化变量
try:
db_session = SessionLocal()
yield db_session
except Exception as e:
# 可选:记录日志(推荐生产环境添加)
# import logging; logging.error(f"数据库会话异常: {str(e)}")
db_session.rollback() # 异常时回滚未提交的操作
raise # 重新抛出异常,让上层处理
finally:
if db_session is not None:
db_session.close() # 确保会话无论是否异常都关闭
if __name__ == "__main__":
# 测试示例
db = next(get_db())
print(db) # <sqlalchemy.orm.session.Session object at 0x00000297EFE323C0>
1.2.异步连接
在根目录下,创建 async_session.py 文件:
import asyncio
from typing import AsyncGenerator, Optional
from sqlalchemy.ext.asyncio import (
create_async_engine,
async_sessionmaker,
AsyncSession,
AsyncEngine # 显式导入类型
)
from sqlalchemy.orm import DeclarativeBase # 2.0+ 推荐的基类
from sqlalchemy.pool import AsyncAdaptedQueuePool # 异步连接池
# ====================== 1. 配置项抽离(提升可维护性) ======================
# 数据库连接配置(统一管理,方便环境切换)
DATABASE_URL = "sqlite+aiosqlite:///async_test.db"
# 调试开关(建议通过环境变量控制,如 os.getenv("DEBUG", "False") == "True")
DEBUG_MODE = True
# ====================== 2. 异步引擎优化(核心性能+兼容性) ======================
async_engine: AsyncEngine = create_async_engine(
url=DATABASE_URL,
# 基础配置
echo=DEBUG_MODE, # 生产环境关闭,调试时开启
echo_pool=False, # 关闭连接池日志(减少IO开销)
# 异步连接池配置(2.0+ 推荐 AsyncAdaptedQueuePool)
poolclass=AsyncAdaptedQueuePool,
pool_size=5, # 核心连接数(SQLite 无实际连接,适配其他数据库)
max_overflow=10, # 最大溢出连接数
pool_recycle=3600, # 1小时回收连接,避免失效(适配MySQL/PostgreSQL)
pool_pre_ping=True, # 获取连接前检测有效性,防止死连接
# SQLite 专属异步优化参数
connect_args={
"check_same_thread": False, # 解决SQLite线程安全问题
"timeout": 30, # 数据库锁定超时时间(默认5秒,提升稳定性)
},
)
# ====================== 3. 异步会话工厂优化(2.0+ 规范) ======================
AsyncSessionLocal = async_sessionmaker(
bind=async_engine,
class_=AsyncSession, # 显式指定异步会话类(2.0+ 规范)
autocommit=False,
autoflush=False,
expire_on_commit=False, # 提交后不失效对象(异步场景减少重复查询)
)
# ====================== 4. 模型基类优化(2.0+ 官方推荐) ======================
# 替代旧的 declarative_base(),2.0+ 推荐直接继承 DeclarativeBase
class Base(DeclarativeBase):
"""所有异步模型的基类(2.0+ 规范写法)"""
__abstract__ = True # 标记为抽象类,不生成数据库表
# ====================== 5. 异步会话获取函数(核心优化,解决原逻辑问题) ======================
async def get_async_db() -> AsyncGenerator[AsyncSession, None]:
"""
获取异步数据库会话生成器(适配 FastAPI 依赖注入)
优化点:
1. 移除 async with 与手动 close() 的冲突
2. 取消自动提交(避免业务逻辑未完成时误提交)
3. 完善异常处理和类型注解
4. 确保会话最终关闭,防止连接泄漏
"""
session: Optional[AsyncSession] = None
try:
session = AsyncSessionLocal() # 手动创建会话,替代 async with
yield session # 传递会话给业务逻辑
except Exception as e:
# 异常时回滚所有未提交的操作
if session:
await session.rollback()
raise e # 重新抛出异常,让上层框架(如FastAPI)处理
finally:
# 确保会话最终关闭,释放连接
if session:
await session.close()
# ====================== 6. 可选:创建数据库表的异步函数(通用工具) ======================
async def create_all_tables() -> None:
"""异步创建所有模型对应的数据库表(初始化用)"""
async with async_engine.begin() as conn:
# 异步执行表创建(2.0+ 推荐使用 engine.begin())
await conn.run_sync(Base.metadata.create_all)
# 测试示例(可选)
async def main():
# 初始化数据库表
# await create_all_tables()
# 使用会话
async for db in get_async_db():
# 业务逻辑示例
print(f"会话状态:{db.is_active}") # 输出: 会话状态:True
if __name__ == "__main__":
asyncio.run(main())
2.定义数据模型(映射数据库表)
2.1.定义模型
数据模型类对应数据库表,类属性对应表字段,异步模型定义与同步版完全一致,异步版仅影响 “操作方式”,不影响 “模型结构”
在根目录下,创建 models.py 文件:
from sqlalchemy import Column, Integer, String, DateTime, Boolean, Index
from sqlalchemy.sql import func # 2.0+ 推荐的函数工具
# 注意:Base 需从之前优化后的 DeclarativeBase 子类导入
# 导入 Base 根据同步/异步场景选择
# from async_session import Base
from sync_session import Base
# ====================== 用户表模型(2.0+ 规范写法) ======================
class User(Base):
"""
用户表模型
表名:users
核心字段:ID、用户名、邮箱、年龄、激活状态、创建/更新时间
"""
__tablename__ = "users"
# 1. 核心字段优化(2.0+ 规范 + 约束增强)
id = Column(
Integer,
primary_key=True,
index=True,
comment="用户ID(自增主键)",
autoincrement=True # 显式声明自增(兼容多数据库)
)
username = Column(
String(50),
unique=True,
nullable=False,
comment="用户名(唯一,非空)",
index=True # 高频查询字段加索引,提升查询性能
)
email = Column(
String(100),
unique=True,
nullable=True,
comment="邮箱(唯一,可选)",
index=True # 邮箱查询场景多,添加索引
)
age = Column(
Integer,
default=0,
nullable=False, # 显式声明非空(默认值已保证,增强约束)
comment="年龄(默认0)"
)
is_active = Column(
Boolean,
default=True,
nullable=False,
comment="是否激活(默认True)",
index=True # 状态字段高频过滤,添加索引
)
# 时间字段优化:使用 func.now() 替代 datetime.now(数据库层面生成时间,兼容异步)
create_time = Column(
DateTime,
default=func.now(), # 2.0+ 推荐,数据库服务器时间(更精准)
nullable=False,
comment="创建时间(自动生成)"
)
# 新增更新时间字段(业务常用,2.0+ 支持 onupdate)
update_time = Column(
DateTime,
default=func.now(),
onupdate=func.now(), # 数据更新时自动刷新
nullable=False,
comment="更新时间(自动更新)"
)
# 2. 复合索引(针对多字段查询场景,提升性能)
__table_args__ = (
# 示例:按「激活状态+创建时间」查询的复合索引
Index("idx_user_active_create", "is_active", "create_time"),
)
# 3. 自定义方法优化(类型注解 + 实用方法)
def __repr__(self) -> str:
"""自定义打印格式(类型注解增强)"""
return f"<User(id={self.id}, username='{self.username}', email='{self.email}', is_active={self.is_active})>"
def to_dict(self) -> dict:
"""新增:模型转字典(业务常用,避免手动序列化)"""
return {
"id": self.id,
"username": self.username,
"email": self.email,
"age": self.age,
"is_active": self.is_active,
"create_time": self.create_time.strftime("%Y-%m-%d %H:%M:%S") if self.create_time else None,
"update_time": self.update_time.strftime("%Y-%m-%d %H:%M:%S") if self.update_time else None,
}
字段类型说明:
Integer:整型(对应数据库 INT);String(n):字符串型(对应 VARCHAR (n));DateTime:日期时间型(对应 DATETIME);Boolean:布尔型(对应 BOOLEAN/INT)。
约束说明:
primary_key=True:主键;unique=True:唯一约束;nullable=False:非空约束;default=值:默认值;index=True:创建索引(提升查询速度)。
2.2.生成表结构
在根目录下,创建 tables.py 文件:
# 注意:Base 需从之前优化后的 DeclarativeBase 子类导入
# 导入 Base 根据同步/异步场景选择
# from session_async import Base, async_engine
from session_sync import Base, sync_engine
# 导入数据模型(必须在创建表之前导入,这样 SQLAlchemy 才能知道要创建哪些表)
from models import User
# ====================== 表创建逻辑优化(兼容同步/异步) ======================
# 1. 同步场景创建表(2.0+ 规范写法)
def create_tables_sync(engine) -> None:
"""同步创建所有表(仅初始化时执行)"""
# checkfirst=True:默认值,先检查表是否存在,避免重复创建
Base.metadata.create_all(bind=engine, checkfirst=True)
# 2. 异步场景创建表(2.0+ 推荐写法)
async def create_tables_async(engine) -> None:
"""异步创建所有表(适配异步引擎)"""
async with engine.begin() as conn:
# run_sync 适配异步引擎执行同步的 metadata.create_all
await conn.run_sync(Base.metadata.create_all, checkfirst=True)
# ====================== 调用示例 ======================
# 同步场景
create_tables_sync(sync_engine)
# # 异步场景(需在异步上下文执行)
# import asyncio
# asyncio.run(create_tables_async(async_engine))
2.3.查看生成结果
同步生成 sync_test.db

异步生成 async_test.db

3.增删改查(CRUD)
3.1.同步操作
在根目录下,创建 sync_crud.py 文件:
3.1.1.新增数据(Create)
from typing import List, Optional, Sequence
from sqlalchemy import select, update, delete, desc
from sqlalchemy.orm import Session
# 假设 Base、User 模型、get_db 已从优化后的配置文件导入
from models import User
from sync_session import get_db
# ====================== 通用工具函数(提升复用性) ======================
def get_db_session() -> Session:
"""获取数据库会话(封装 get_db,简化调用)"""
return next(get_db())
# ====================== 3.1.1 新增数据(Create)- 2.0+ 优化 ======================
def create_users():
"""新增数据(单个/批量)- 2.0+ 规范写法"""
db: Session = get_db_session()
try:
# 方式1:创建单个对象(添加类型注解)
user1: User = User(username="zhangsan", email="zhangsan@example.com", age=20)
db.add(user1)
# 方式2:批量创建对象(类型注解 + 列表初始化)
batch_users: List[User] = [
User(username="lisi", email="lisi@example.com", age=22),
User(username="wangwu", email="wangwu@example.com", age=25)
]
db.add_all(batch_users)
# 提交事务(2.0+ 推荐先 flush 再 commit,确保获取自增ID)
db.flush() # 预提交,生成ID但不持久化
db.commit() # 最终提交
# 刷新对象(获取数据库自动生成的字段)
db.refresh(user1)
print(f"新增用户ID:{user1.id}") # 输出:新增用户ID:1
except Exception as e:
db.rollback() # 异常回滚
raise RuntimeError(f"新增用户失败:{str(e)}") from e
finally:
db.close() # 确保会话关闭
查询 users 表所有记录:

3.1.2 查询数据(Read)
# ====================== 3.1.2 查询数据(Read)- 2.0+ 核心优化 ======================
def query_users():
"""查询数据 - 2.0+ 推荐使用 select() 构造器(替代旧 query API)"""
db: Session = get_db_session()
try:
# 1. 查询所有用户
stmt = select(User)
users: Sequence[User] = db.scalars(stmt).all()
print("所有用户:", users) # 输出: [<User(id=1, username='zhangsan', email='zhangsan@example.com', is_active=True)>, <User(id=2, username='lisi', email='lisi@example.com', is_active=True)>, <User(id=3, username='wangwu', email='wangwu@example.com', is_active=True)>]
# 2. 查询单个用户(按主键)
user: Optional[User] = db.get(User, 1) # 2.0+ 推荐直接用 session.get()
print("主键为1的用户:", user) # 输出: <User(id=1, username='zhangsan', email='zhangsan@example.com', is_active=True)>
# 3. 条件查询
# 方式1:filter(2.0+ 推荐,支持复杂条件)
stmt = select(User).where(User.username == "zhangsan")
user: Optional[User] = db.scalars(stmt).first()
print("方式1:用户名是zhangsan的用户:", user) # 输出: <User(id=1, username='zhangsan', email='zhangsan@example.com', is_active=True)>
# 方式2:简易条件(2.0+ 无 filter_by,用 where 简化)
stmt = select(User).where(User.username == "zhangsan")
user: Optional[User] = db.scalars(stmt).first()
print("方式2:用户名是zhangsan的用户:", user) # 输出: <User(id=1, username='zhangsan', email='zhangsan@example.com', is_active=True)>
# 4. 多条件查询
stmt = select(User).where(User.age > 20, User.is_active == True)
users: Sequence[User] = db.scalars(stmt).all()
print("年龄>20且激活的用户:", users) # 输出: [<User(id=2, username='lisi', email='lisi@example.com', is_active=True)>, <User(id=3, username='wangwu', email='wangwu@example.com', is_active=True)>]
# 5. 排序查询(2.0+ 用 desc()/asc() 函数)
stmt = select(User).order_by(desc(User.age))
users: Sequence[User] = db.scalars(stmt).all()
print("按年龄降序的用户:", users) # 输出: [<User(id=3, username='wangwu', email='wangwu@example.com', is_active=True)>, <User(id=2, username='lisi', email='lisi@example.com', is_active=True)>, <User(id=1, username='zhangsan', email='zhangsan@example.com', is_active=True)>]
# 6. 限制查询结果数量(2.0+ 用 limit() 方法)
stmt = select(User).limit(2)
users: Sequence[User] = db.scalars(stmt).all()
print("前2个用户:", users) # 输出: [<User(id=1, username='zhangsan', email='zhangsan@example.com', is_active=True)>, <User(id=2, username='lisi', email='lisi@example.com', is_active=True)>]
finally:
db.close()
3.1.3 修改数据(Update)
# ====================== 3.1.3 修改数据(Update)- 2.0+ 优化 ======================
def update_users():
"""修改数据(单个/批量)- 2.0+ 规范写法"""
db: Session = get_db_session()
try:
# 1. 修改单个对象(查询-修改-提交)
stmt = select(User).where(User.username == "zhangsan")
user: Optional[User] = db.scalars(stmt).first()
if user:
# 修改属性
user.age = 21
user.email = "zhangsan_new@example.com"
db.commit() # 提交修改
db.refresh(user) # 刷新对象获取最新数据
print("修改后的用户:", user) # 输出: <User(id=1, username='zhangsan', email='zhangsan_new@example.com', is_active=True)>
# 2. 批量修改(2.0+ 新 API:update() 构造器)
stmt = (
update(User)
.where(User.age < 25)
.values(is_active=False) # 2.0+ 用 values() 指定修改字段
.execution_options(synchronize_session="fetch") # 同步会话数据
)
result = db.execute(stmt)
db.commit()
# 输出: 批量修改完成,影响行数:2
print(f"批量修改完成,影响行数:{result.rowcount}") # type: ignore[attr-defined]
except Exception as e:
db.rollback()
raise RuntimeError(f"修改用户失败:{str(e)}") from e
finally:
db.close()
3.1.4 删除数据(Delete)
# ====================== 3.1.4 删除数据(Delete)- 2.0+ 优化 ======================
def delete_users():
"""删除数据(单个/批量)- 2.0+ 规范写法"""
db: Session = get_db_session()
try:
# 1. 删除单个对象
stmt = select(User).where(User.username == "wangwu")
user: Optional[User] = db.scalars(stmt).first()
if user:
db.delete(user)
db.commit()
print("删除用户完成")
# 2. 批量删除(2.0+ 新 API:delete() 构造器)
stmt = (
delete(User)
.where(User.is_active == False)
.execution_options(synchronize_session="fetch")
)
result = db.execute(stmt)
db.commit()
# 输出: 批量删除完成,影响行数:2
print(f"批量删除完成,影响行数:{result.rowcount}") # type: ignore[attr-defined]
except Exception as e:
db.rollback()
raise RuntimeError(f"删除用户失败:{str(e)}") from e
finally:
db.close()
# ====================== 调用示例 ======================
if __name__ == "__main__":
# 执行新增
# create_users()
# 执行查询
# query_users()
# 执行修改
# update_users()
# 执行删除
delete_users()
3.2.异步操作
在根目录下,创建 async_crud.py 文件:
3.2.1.新增数据(Create)
import asyncio
from typing import List, Optional
from sqlalchemy import select, update, delete, desc, Sequence
from sqlalchemy.ext.asyncio import AsyncSession
# 假设从优化后的异步配置文件导入以下依赖
from models import User
# ====================== 通用工具函数(异步) ======================
async def get_async_db_session() -> AsyncSession:
"""获取异步数据库会话(封装 get_async_db,简化调用)"""
# 由于 get_async_db 现在是上下文管理器,我们需要一个不同的实现
# 直接创建会话而不是使用上下文管理器
from async_session import AsyncSessionLocal
session = AsyncSessionLocal()
return session
# ====================== 3.1.1 新增数据(Create)- 异步版 ======================
async def create_users_async():
"""异步新增数据(单个/批量)"""
db: AsyncSession = await get_async_db_session()
try:
# 方式1:创建单个对象
user1: User = User(username="zhangsan", email="zhangsan@example.com", age=20)
db.add(user1)
# 方式2:批量创建对象
batch_users: List[User] = [
User(username="lisi", email="lisi@example.com", age=22),
User(username="wangwu", email="wangwu@example.com", age=25)
]
db.add_all(batch_users)
# 异步提交(2.0+ 异步必须用 await)
await db.flush() # 预提交,生成自增ID
await db.commit() # 最终提交
# 异步刷新对象
await db.refresh(user1)
print(f"新增用户ID:{user1.id}") # 输出:新增用户ID:1
except Exception as e:
await db.rollback() # 异步回滚
raise RuntimeError(f"异步新增用户失败:{str(e)}") from e
finally:
await db.close() # 异步关闭会话
查询 users 表所有记录:

3.2.2 查询数据(Read)
# ====================== 3.1.2 查询数据(Read)- 异步版 ======================
async def query_users_async():
"""异步查询数据(2.0+ 异步 select 构造器)"""
db: AsyncSession = await get_async_db_session()
try:
# 1. 查询所有用户(异步核心:await + scalars + all)
stmt = select(User)
result = await db.scalars(stmt) # 异步执行查询
users: Sequence[User] = result.all()
print("所有用户:",
users) # 输出: [<User(id=1, username='zhangsan', email='zhangsan@example.com', is_active=True)>, <User(id=2, username='lisi', email='lisi@example.com', is_active=True)>, <User(id=3, username='wangwu', email='wangwu@example.com', is_active=True)>]
# 2. 查询单个用户(按主键,异步 get)
user: Optional[User] = await db.get(User, 1) # 异步 get 方法
print("主键为1的用户:",
user) # 输出: <User(id=1, username='zhangsan', email='zhangsan@example.com', is_active=True)>
# 3. 条件查询
# 方式1:where 条件 + 异步执行
stmt = select(User).where(User.username == "zhangsan")
user: Optional[User] = (await db.scalars(stmt)).first()
print("方式1:用户名是zhangsan的用户:",
user) # 输出: <User(id=1, username='zhangsan', email='zhangsan@example.com', is_active=True)>
# 4. 多条件查询
stmt = select(User).where(User.age > 20, User.is_active == True)
users: Sequence[User] = (await db.scalars(stmt)).all()
print("年龄>20且激活的用户:",
users) # 输出: [<User(id=2, username='lisi', email='lisi@example.com', is_active=True)>, <User(id=3, username='wangwu', email='wangwu@example.com', is_active=True)>]
# 5. 排序查询(2.0+ desc 函数)
stmt = select(User).order_by(desc(User.age))
users: Sequence[User] = (await db.scalars(stmt)).all()
print("按年龄降序的用户:",
users) # 输出: [<User(id=3, username='wangwu', email='wangwu@example.com', is_active=True)>, <User(id=2, username='lisi', email='lisi@example.com', is_active=True)>, <User(id=1, username='zhangsan', email='zhangsan@example.com', is_active=True)>]
# 6. 限制查询结果数量
stmt = select(User).limit(2)
users: Sequence[User] = (await db.scalars(stmt)).all()
print("前2个用户:",
users) # 输出: [<User(id=1, username='zhangsan', email='zhangsan@example.com', is_active=True)>, <User(id=2, username='lisi', email='lisi@example.com', is_active=True)>]
finally:
await db.close()
3.2.3 修改数据(Update)
# ====================== 3.1.3 修改数据(Update)- 异步版 ======================
async def update_users_async():
"""异步修改数据(单个/批量)"""
db: AsyncSession = await get_async_db_session()
try:
# 1. 修改单个对象(查询-修改-提交)
stmt = select(User).where(User.username == "zhangsan")
user: Optional[User] = (await db.scalars(stmt)).first()
if user:
user.age = 21
user.email = "zhangsan_new@example.com"
await db.commit() # 异步提交
await db.refresh(user) # 异步刷新
print("修改后的用户:",
user) # 输出: <User(id=1, username='zhangsan', email='zhangsan_new@example.com', is_active=True)>
# 2. 批量修改(2.0+ 异步 update 构造器)
stmt = (
update(User)
.where(User.age < 25)
.values(is_active=False)
.execution_options(synchronize_session="fetch")
)
result = await db.execute(stmt) # 异步执行批量更新
await db.commit()
# 输出: 批量修改完成,影响行数:2
print(f"批量修改完成,影响行数:{result.rowcount}") # type: ignore[attr-defined]
except Exception as e:
await db.rollback()
raise RuntimeError(f"异步修改用户失败:{str(e)}") from e
finally:
await db.close()
3.2.4 删除数据(Delete)
# ====================== 3.1.4 删除数据(Delete)- 异步版 ======================
async def delete_users_async():
"""异步删除数据(单个/批量)"""
db: AsyncSession = await get_async_db_session()
try:
# 1. 删除单个对象
stmt = select(User).where(User.username == "wangwu")
user: Optional[User] = (await db.scalars(stmt)).first()
if user:
await db.delete(user) # 异步删除
await db.commit()
print("删除用户完成")
# 2. 批量删除(2.0+ 异步 delete 构造器)
stmt = delete(User).where(User.is_active == False)
result = await db.execute(stmt) # 异步执行批量删除
await db.commit()
# 输出: 批量删除完成,影响行数:2
print(f"批量删除完成,影响行数:{result.rowcount}") # type: ignore[attr-defined]
except Exception as e:
await db.rollback()
raise RuntimeError(f"异步删除用户失败:{str(e)}") from e
finally:
await db.close()
# ====================== 异步入口函数(执行所有操作) ======================
async def main():
"""异步主函数:按顺序执行所有 CRUD 操作"""
# 1. 新增数据
# await create_users_async()
# 2. 查询数据
# await query_users_async()
# 3. 修改数据
# await update_users_async()
# 4. 删除数据
await delete_users_async()
# ====================== 执行异步代码 ======================
if __name__ == "__main__":
# 异步代码必须在 asyncio 事件循环中执行
asyncio.run(main())
3.3.同步 vs 异步核心差异
3.3.1.核心语法差异(最直观)
| 维度 | 同步操作 | 异步操作 | 关键说明 |
|---|---|---|---|
| 函数定义 | def func(): |
async def func(): |
所有异步操作函数必须标记 async |
| 操作执行 | 直接调用(如 db.commit()) |
需加 await(如 await db.commit()) |
所有数据库 IO 操作必须用 await 挂起 |
| 入口执行 | 直接调用 func() |
asyncio.run(func()) |
异步代码必须在事件循环中执行 |
| 会话获取 | next(get_db()) |
await get_async_db_session() |
异步生成器需 await + 循环获取 |
3.3.2.核心 API 差异(2.0+ 重点)
| 同步 API(旧 / 2.0 兼容) | 异步 API(2.0+ 推荐) | 适用场景 |
|---|---|---|
Session |
AsyncSession |
会话对象类型 |
db.get(User, 1) |
await db.get(User, 1) |
按主键查询 |
db.scalars(stmt).all() |
(await db.scalars(stmt)).all() |
通用查询 |
db.execute(stmt) |
await db.execute(stmt) |
执行 update/delete 语句 |
db.commit() |
await db.commit() |
提交事务 |
db.rollback() |
await db.rollback() |
回滚事务 |
db.delete(user) |
await db.delete(user) |
删除单个对象 |
db.close() |
await db.close() |
关闭会话 |
3.3.3.底层依赖 / 配置差异
| 配置项 | 同步版本 | 异步版本 |
|---|---|---|
| 引擎创建 | create_engine() |
create_async_engine() |
| 会话工厂 | sessionmaker() |
async_sessionmaker() |
| SQLite 驱动 | 内置 sqlite3 |
需安装 aiosqlite(pip install aiosqlite) |
| MySQL/PG 驱动 | pymysql/psycopg2 |
asyncmy/psycopg[async] |
| 连接池 | QueuePool |
AsyncAdaptedQueuePool |
3.3.4.执行逻辑差异
| 特性 | 同步执行 | 异步执行 |
|---|---|---|
| 阻塞方式 | 线程阻塞(等待数据库响应) | 协程挂起(不阻塞线程,可处理其他任务) |
| 并发能力 | 受线程数限制(GIL 影响) | 高并发(单线程可处理上千协程) |
| 异常处理 | try-except 直接捕获 |
try-except 捕获 + await 内执行 |
| 事务控制 | 同步 flush/commit/rollback | 异步 flush/commit/rollback(均需 await) |
四、进阶技巧(同步版)
1.高级查询
异步版高级查询语法与同步版一致,仅需在执行时添加 await,在根目录下创建 sync_query.py 文件:
1.1.定义模型
from typing import Tuple
from sqlalchemy import (
Column, Integer, String, DateTime, Boolean,
or_, and_, not_, func, select, distinct, Sequence
)
from sqlalchemy.orm import Session, DeclarativeBase
from sync_crud import get_db_session
from sync_session import sync_engine
# ====================== 基础模型定义(2.0+ 规范) ======================
class Base(DeclarativeBase):
__abstract__ = True
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True, autoincrement=True, comment="用户ID")
username = Column(String(50), unique=True, nullable=False, index=True, comment="用户名")
email = Column(String(100), unique=True, nullable=True, index=True, comment="邮箱")
age = Column(Integer, default=0, nullable=False, comment="年龄")
is_active = Column(Boolean, default=True, nullable=False, index=True, comment="是否激活")
create_time = Column(DateTime, default=func.now(), nullable=False, comment="创建时间")
def __repr__(self) -> str:
return f"<User(id={self.id}, username='{self.username}', email='{self.email}')>"
class Article(Base):
__tablename__ = "articles"
id = Column(Integer, primary_key=True, index=True, autoincrement=True, comment="文章ID")
title = Column(String(100), nullable=False, comment="文章标题")
content = Column(String(500), comment="文章内容")
user_id = Column(Integer, nullable=False, index=True, comment="关联用户ID")
create_time = Column(DateTime, default=func.now(), nullable=False, comment="创建时间")
def __repr__(self) -> str:
return f"<Article(id={self.id}, title='{self.title}', user_id={self.user_id})>"
# ====================== 1.1 初始化表和测试数据 ======================
def init_data():
"""初始化测试数据(仅首次执行)"""
db: Session = get_db_session()
try:
# 创建表
print("正在创建数据库表...")
Base.metadata.create_all(bind=sync_engine)
print("表创建完成")
# 清空旧数据(避免重复)
print("正在清空旧数据...")
db.query(User).delete()
db.query(Article).delete()
db.commit()
print("旧数据已清空")
# 新增用户
print("正在新增用户...")
users = [
User(username="zhangsan", email="zhangsan@example.com", age=20),
User(username="lisi", email="lisi@example.com", age=22),
User(username="wangwu", email="wangwu@example.com", age=25),
User(username="zhaoliu", email=None, age=30) # type: ignore[arg-type]
]
db.add_all(users)
db.commit()
print(f"用户新增完成,共 {len(users)} 个用户")
# 新增文章
print("正在新增文章...")
articles = [
Article(title="Python 入门", content="SQLAlchemy 学习", user_id=1),
Article(title="SQLAlchemy 进阶", content="关联查询", user_id=1),
Article(title="MySQL 优化", content="索引使用", user_id=2)
]
db.add_all(articles)
db.commit()
print(f"文章新增完成,共 {len(articles)} 篇文章")
print("\n=== 测试数据初始化完成 ===\n")
except Exception as e:
db.rollback()
error_msg = f"初始化数据失败:{str(e)}"
print(error_msg)
import traceback
traceback.print_exc()
raise RuntimeError(error_msg) from e
finally:
db.close()
1.2.筛选查询
除基础条件外,SQLAlchemy 支持丰富的筛选操作:
# ====================== 1.2 筛选查询(2.0+ 优化版) ======================
def filter_query():
"""筛选查询(2.0+ select 构造器)"""
db: Session = get_db_session()
try:
# 1. 或条件(or_)
stmt = select(User).where(or_(User.age == 20, User.username == "lisi"))
users: Sequence[User] = db.scalars(stmt).all()
print("或条件(or_): ", users) # 输出: [<User(id=1, username='zhangsan', email='zhangsan@example.com')>, <User(id=2, username='lisi', email='lisi@example.com')>]
# 2. 与条件(and_)
stmt = select(User).where(and_(User.age > 20, User.is_active == True))
users = db.scalars(stmt).all()
print("与条件(and_): ", users) # 输出: [<User(id=2, username='lisi', email='lisi@example.com')>, <User(id=3, username='wangwu', email='wangwu@example.com')>, <User(id=4, username='zhaoliu', email='None')>]
# 3. 非条件(not_)
stmt = select(User).where(not_(User.age == 20))
users = db.scalars(stmt).all()
print("非条件(not_): ", users) # 输出: [<User(id=2, username='lisi', email='lisi@example.com')>, <User(id=3, username='wangwu', email='wangwu@example.com')>, <User(id=4, username='zhaoliu', email='None')>]
# 4. 模糊查询(like)
# 以zhang开头
stmt = select(User).where(User.username.like("zhang%"))
users = db.scalars(stmt).all()
print("模糊查询(以zhang开头): ", users) # 输出: [<User(id=1, username='zhangsan', email='zhangsan@example.com')>]
# 包含si
stmt = select(User).where(User.username.like("%si%"))
users = db.scalars(stmt).all()
print("模糊查询(包含si): ", users) # 输出: [<User(id=2, username='lisi', email='lisi@example.com')>]
# 5. 范围查询(in_)
stmt = select(User).where(User.age.in_([20, 21, 22]))
users = db.scalars(stmt).all()
print("范围查询(in_): ", users) # 输出: [<User(id=1, username='zhangsan', email='zhangsan@example.com')>, <User(id=2, username='lisi', email='lisi@example.com')>]
# 6. 空值查询
# 邮箱为空
stmt = select(User).where(User.email.is_(None))
users = db.scalars(stmt).all()
print("空值查询(is_): ", users) # 输出: [<User(id=4, username='zhaoliu', email='None')>]
# 邮箱不为空
stmt = select(User).where(User.email.isnot(None))
users = db.scalars(stmt).all()
print("空值查询(isnot): ", users) # 输出: [<User(id=1, username='zhangsan', email='zhangsan@example.com')>, <User(id=2, username='lisi', email='lisi@example.com')>, <User(id=3, username='wangwu', email='wangwu@example.com')>]
finally:
db.close()
1.2.聚合查询
使用 func 实现计数、求和、平均值等聚合操作:
# ====================== 1.3 聚合查询(2.0+ 优化版) ======================
def aggregate_query():
"""聚合查询(2.0+ func 优化)"""
db: Session = get_db_session()
try:
# 1. 计数(推荐用 scalar() 获取单个值)
stmt = select(func.count(User.id))
total_users: int = db.scalar(stmt)
print("总用户数:", total_users) # 输出: 4
# 2. 求和
stmt = select(func.sum(User.age))
total_age: int = db.scalar(stmt)
print("年龄总和:", total_age) # 输出: 97
# 3. 平均值(转换为浮点数,避免精度丢失)
stmt = select(func.avg(User.age))
avg_age_value = db.scalar(stmt)
avg_age: float = float(avg_age_value) if avg_age_value is not None else 0.0
# 保留 2 位小数
print("平均年龄:", round(avg_age, 2)) # 输出: 24.25
# 4. 最大值/最小值
stmt_max = select(func.max(User.age))
max_age: int = db.scalar(stmt_max)
stmt_min = select(func.min(User.age))
min_age: int = db.scalar(stmt_min)
print("最大年龄:", max_age, "最小年龄:", min_age) # 输出: 最大年龄: 30 最小年龄: 20
# 5. 分组聚合(按激活状态分组)
stmt = select(User.is_active, func.count(User.id)).group_by(User.is_active)
result: Sequence[Tuple[bool, int]] = db.execute(stmt).all()
print("按激活状态分组:", result) # 输出: [(True, 4)]
finally:
db.close()
1.4.关联查询
以「用户 - 文章」一对多为列
# ====================== 1.4 关联查询(2.0+ 优化版) ======================
def join_query():
"""关联查询(2.0+ join/outerjoin 规范写法)"""
db: Session = get_db_session()
try:
# 1. 内连接(INNER JOIN):查询用户及其文章
# 生成的SQL:SELECT users.id, users.username, users.email, users.age, users.is_active, users.create_time, articles.id AS id_1, articles.title, articles.content, articles.user_id, articles.create_time AS create_time_1
# FROM users JOIN articles ON users.id = articles.user_id
stmt = select(User, Article).join(Article, User.id == Article.user_id)
result: Sequence[Tuple[User, Article]] = db.execute(stmt).all()
print("\n内连接结果:")
for user, article in result: # type: ignore[misc]
print(f"用户:{user.username},文章:{article.title}")
# 输出:
# 用户:zhangsan,文章:Python 入门
# 用户:zhangsan,文章:SQLAlchemy 进阶
# 用户:lisi,文章:MySQL 优化
# 2. 左连接(LEFT JOIN):查询所有用户(包括无文章的)
# 生成的SQL:SELECT users.id, users.username, users.email, users.age, users.is_active, users.create_time, articles.id AS id_1, articles.title, articles.content, articles.user_id, articles.create_time AS create_time_1
# FROM users LEFT OUTER JOIN articles ON users.id = articles.user_id
stmt = select(User, Article).outerjoin(Article, User.id == Article.user_id)
result = db.execute(stmt).all()
print("\n左连接结果:")
for user, article in result: # type: ignore[misc]
print(f"用户:{user.username},文章:{article.title if article else '无'}")
# 输出:
# 用户:zhangsan,文章:Python 入门
# 用户:zhangsan,文章:SQLAlchemy 进阶
# 用户:lisi,文章:MySQL 优化
# 用户:wangwu,文章:无
# 用户:zhaoliu,文章:无
finally:
db.close()
1.5.子查询
子查询是嵌套在主查询中的查询,示例:
# ====================== 1.5 子查询(2.0+ 优化版) ======================
def sub_query():
"""子查询(2.0+ scalar_subquery/subquery 规范)"""
db: Session = get_db_session()
try:
# 示例1:查询有文章的用户
# 子查询:所有发布过文章的用户ID(去重)
subquery = select(distinct(Article.user_id)).scalar_subquery()
# 主查询
stmt = select(User).where(User.id.in_(subquery))
users = db.scalars(stmt).all()
print("\n发布过文章的用户:", users) # 输出: [<User(id=1, username='zhangsan', email='zhangsan@example.com')>, <User(id=2, username='lisi', email='lisi@example.com')>]
# 示例2:查询年龄大于平均年龄的用户
# 子查询:平均年龄
subquery = select(func.avg(User.age)).scalar_subquery()
# 主查询
stmt = select(User).where(User.age > subquery)
users = db.scalars(stmt).all()
print("年龄大于平均年龄的用户:", users) # 输出: [<User(id=3, username='wangwu', email='wangwu@example.com')>, <User(id=4, username='zhaoliu', email='None')>]
# 示例3:查询发布过2篇以上文章的用户
# 子查询:统计用户文章数(>2)
subquery = (
select(Article.user_id, func.count(Article.id).label("article_count"))
.group_by(Article.user_id)
.having(func.count(Article.id) > 2)
.subquery()
)
# 主查询:关联子查询
stmt = select(User).join(subquery, User.id == subquery.c.user_id)
users = db.scalars(stmt).all()
print("发布过2篇以上文章的用户:", users) # 输出: []
finally:
db.close()
1.6.分页查询
分页是业务中常用的功能,通过 offset(偏移量)和 limit(数量)实现:
# ====================== 1.6 分页查询(2.0+ 优化版) ======================
def pagination_query():
"""分页查询(2.0+ 规范写法,带总页数计算)"""
db: Session = get_db_session()
try:
# 分页参数
page = 1 # 当前页
page_size = 2 # 每页条数
offset = (page - 1) * page_size
# 1. 分页查询数据
stmt = select(User).offset(offset).limit(page_size).order_by(User.id)
users: Sequence[User] = db.scalars(stmt).all()
print(f"\n第{page}页用户:", users) # 输出: 第1页用户: [<User(id=1, username='zhangsan', email='zhangsan@example.com')>, <User(id=2, username='lisi', email='lisi@example.com')>]
# 2. 计算总条数和总页数(优化:一次查询获取总条数)
total_count_stmt = select(func.count(User.id))
total_users: int = db.scalar(total_count_stmt)
total_pages = (total_users + page_size - 1) // page_size # 向上取整
print(f"总用户数:{total_users},总页数:{total_pages}") # 输出: 总用户数:4,总页数:2
# 扩展:分页通用函数
def paginate_query(query_stmt, page_num: int, page_size_num: int): # type: ignore[no-untyped-def]
"""通用分页函数"""
offset_num = (page_num - 1) * page_size_num
# 查询数据
query_data = db.scalars(query_stmt.offset(offset_num).limit(page_size_num)).all()
# 查询总条数
query_total = db.scalar(select(func.count()).select_from(query_stmt.subquery()))
query_total_pages = (query_total + page_size_num - 1) // page_size_num if query_total else 0
return query_data, query_total or 0, query_total_pages
# 测试通用分页函数
data, total, pages = paginate_query(select(User), page_num=2, page_size_num=2)
print(f"第 2 页用户:{data},总条数:{total},总页数:{pages}") # 输出: 第 2 页用户:[<User(id=3, username='wangwu', email='wangwu@example.com')>, <User(id=4, username='zhaoliu', email='None')>],总条数:4,总页数:2
finally:
db.close()
# ====================== 执行所有测试 ======================
if __name__ == "__main__":
# 初始化测试数据(仅首次执行)
init_data()
# 执行筛选查询
filter_query()
# 执行聚合查询
aggregate_query()
# 执行关联查询
join_query()
# 执行子查询
sub_query()
# 执行分页查询
pagination_query()
2.关系映射
SQLAlchemy 提供 relationship 实现表之间的关联关系,无需手动写 JOIN 语句。
2.1.定义模型
from typing import List, Optional
from sqlalchemy import (
Column, Integer, String, ForeignKey, Boolean, Table, func, DateTime
)
from sqlalchemy.orm import (
Session, DeclarativeBase, relationship, Mapped, mapped_column
)
# 假设从优化后的配置导入会话和引擎
from sync_crud import get_db_session
from sync_session import sync_engine
# ====================== 基础基类(2.0+ 规范) ======================
class Base(DeclarativeBase):
"""所有模型的基类(2.0+ 推荐 DeclarativeBase)"""
__abstract__ = True
# 通用字段:创建/更新时间(所有表复用)
create_time = mapped_column(DateTime, default=func.now(), nullable=False, comment="创建时间")
update_time = mapped_column(DateTime, default=func.now(), onupdate=func.now(), nullable=False, comment="更新时间")
# ====================== 2.2 一对多(用户 - 文章) ======================
# 推荐:先定义被关联模型(Article),避免循环引用;或使用字符串引用
class Article(Base):
__tablename__ = "articles"
# 2.0+ 推荐使用 mapped_column 替代旧 Column 写法(类型注解更友好)
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, index=True, comment="文章ID")
title: Mapped[str] = mapped_column(String(100), nullable=False, index=True, comment="文章标题")
content: Mapped[Optional[str]] = mapped_column(String(500), comment="文章内容")
user_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id", ondelete="CASCADE"), comment="关联用户ID")
# 多对一关联:多个文章属于一个用户
author: Mapped["User"] = relationship(
"User",
back_populates="articles",
lazy="selectin" # 2.0+ 推荐的加载策略,避免N+1查询
)
def __repr__(self) -> str:
return f"<Article(id={self.id}, title='{self.title}', user_id={self.user_id})>"
# ====================== 2.4 多对多(用户 - 角色) ======================
# 中间表(2.0+ 规范写法)
user_role = Table(
"user_role",
Base.metadata,
Column("user_id", Integer, ForeignKey("users.id", ondelete="CASCADE"), primary_key=True, comment="用户ID"),
Column("role_id", Integer, ForeignKey("roles.id", ondelete="CASCADE"), primary_key=True, comment="角色ID"),
# 新增中间表通用字段(可选)
Column("create_time", DateTime, default=func.now(), nullable=False, comment="关联创建时间")
)
class Role(Base):
__tablename__ = "roles"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, index=True, comment="角色ID")
name: Mapped[str] = mapped_column(String(50), unique=True, nullable=False, comment="角色名称")
# 多对多关联:一个角色对应多个用户
users: Mapped[List["User"]] = relationship(
"User",
secondary=user_role, # 指定中间表
back_populates="roles",
cascade="all",
lazy="selectin"
)
class User(Base):
__tablename__ = "users"
# 2.0+ 新写法:Mapped + mapped_column(类型提示更精准)
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, index=True, comment="用户ID")
username: Mapped[str] = mapped_column(String(50), unique=True, nullable=False, index=True, comment="用户名")
email: Mapped[Optional[str]] = mapped_column(String(100), unique=True, index=True, comment="邮箱")
is_active: Mapped[bool] = mapped_column(Boolean, default=True, comment="是否激活")
# 一对多关联:一个用户有多个文章
articles: Mapped[List["Article"]] = relationship(
"Article",
back_populates="author",
cascade="all, delete-orphan", # 级联删除:删用户时删文章,删文章自动解除关联
passive_deletes=True, # 配合外键 ondelete="CASCADE",提升删除性能
lazy="selectin" # 预加载关联数据,避免N+1查询
)
# 一对一关联:uselist=False 表示非列表(一对一)
profile: Mapped[Optional["UserProfile"]] = relationship(
"UserProfile",
back_populates="user",
uselist=False,
cascade="all, delete-orphan",
passive_deletes=True,
lazy="selectin"
)
# 多对多关联:一个用户对应多个角色
roles: Mapped[List["Role"]] = relationship(
"Role",
secondary=user_role,
back_populates="users",
cascade="all",
lazy="selectin"
)
def __repr__(self) -> str:
return f"<User(id={self.id}, username='{self.username}', email='{self.email}')>"
# ====================== 2.3 一对一(用户 - 用户详情) ======================
class UserProfile(Base):
__tablename__ = "user_profiles"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, comment="详情ID")
address: Mapped[Optional[str]] = mapped_column(String(200), comment="地址")
phone: Mapped[Optional[str]] = mapped_column(String(20), comment="手机号")
# 唯一外键:保证一对一关系
user_id: Mapped[int] = mapped_column(
Integer,
ForeignKey("users.id", ondelete="CASCADE"),
unique=True,
comment="关联用户ID"
)
# 一对一关联:一个详情属于一个用户
user: Mapped["User"] = relationship(
"User",
back_populates="profile",
lazy="selectin"
)
# ====================== 2.1 工具函数:初始化表和测试数据 ======================
def init_relationship_tables():
"""初始化关系表(删除旧表+创建新表)"""
db: Session = get_db_session()
try:
# 先删除旧表(按依赖顺序)
Base.metadata.drop_all(bind=sync_engine)
# 创建新表
Base.metadata.create_all(bind=sync_engine)
print("关系表初始化完成")
finally:
db.close()
查询生成的表:

2.2.一对多(用户 - 文章)
# ====================== 2.2 一对多使用示例 ======================
def one_to_many():
"""测试一对多关系(修复PyCharm报错+规范写法)"""
db: Session = get_db_session()
try:
# 1. 新增用户+文章(修复PyCharm参数报错:使用类型注解+分步创建)
# 方式1:推荐写法(分步创建,无IDE报错)
new_user = User(username="tianqi", email="tianqi@example.com")
# 创建文章并关联作者(推荐)
article1 = Article(title="SQLAlchemy关系映射", content="一对多示例", author=new_user)
article2 = Article(title="PythonORM", content="ORM入门", author=new_user)
db.add_all([new_user, article1, article2])
db.flush() # 预提交,生成ID
db.commit()
print("新增用户+文章完成")
# 2. 查询用户的所有文章(2.0+ select 构造器)
from sqlalchemy import select
stmt = select(User).where(User.username == "tianqi")
user = db.scalars(stmt).first()
if user:
print(f"\n{user.username}的文章:", [art.title for art in user.articles]) # 输出: tianqi的文章: ['SQLAlchemy关系映射', 'PythonORM']
# 3. 查询文章的作者
stmt = select(Article).where(Article.title == "SQLAlchemy关系映射")
article = db.scalars(stmt).first()
if article:
print(f"{article.title}的作者:", article.author.username) # 输出: SQLAlchemy关系映射的作者: tianqi
except Exception as e:
db.rollback()
raise RuntimeError(f"一对多测试失败:{str(e)}") from e
finally:
db.close()
参数说明:
back_populates:反向关联(对应另一张表的关系字段);cascade="all, delete-orphan":级联操作(删除用户时自动删除其文章);ForeignKey("users.id"):外键约束(关联用户表的 id 字段)。

2.3.一对一(用户 - 用户详情)
# ====================== 2.3 一对一使用示例) ======================
def one_to_one():
"""测试一对一关系"""
db: Session = get_db_session()
try:
# 1. 查询用户
from sqlalchemy import select
stmt = select(User).where(User.username == "tianqi")
user = db.scalars(stmt).first()
if user:
# 2. 新增/更新用户详情
if user.profile:
db.delete(user.profile)
db.flush()
user.profile = UserProfile(address="北京市海淀区", phone="13800138000")
db.commit()
# 输出: tianqi的地址: 北京市海淀区
print(f"\n{user.username}的地址:", user.profile.address) # type: ignore
# 输出: tianqi的手机号: 13800138000
print(f"{user.username}的手机号:", user.profile.phone) # type: ignore
except Exception as e:
db.rollback()
raise RuntimeError(f"一对一测试失败:{str(e)}") from e
finally:
db.close()
执行后,查询结果:

2.4.多对多(用户 - 角色)
多对多需要中间表,示例:
# ====================== 2.4 多对多使用示例(优化版) ======================
def many_to_many():
"""测试多对多关系"""
db: Session = get_db_session()
try:
# 1. 新增角色
admin_role = Role(name="admin")
guest_role = Role(name="guest")
db.add_all([admin_role, guest_role])
db.commit()
# 2. 给用户分配角色
from sqlalchemy import select
stmt = select(User).where(User.username == "tianqi")
user = db.scalars(stmt).first()
if user:
user.roles.append(admin_role)
user.roles.append(guest_role)
db.commit()
# 3. 查询用户的角色
print(f"\n{user.username}的角色:", [role.name for role in user.roles]) # 输出: tianqi的角色: ['admin', 'guest']
# 4. 查询角色下的用户
stmt = select(Role).where(Role.name == "admin")
role = db.scalars(stmt).first()
if role:
print(f"{role.name}角色下的用户:", [u.username for u in role.users]) # 输出: admin角色下的用户: ['tianqi']
except Exception as e:
db.rollback()
raise RuntimeError(f"多对多测试失败:{str(e)}") from e
finally:
db.close()
# ====================== 执行所有测试 ======================
if __name__ == "__main__":
# 1. 初始化表
init_relationship_tables()
# 2. 测试一对多
one_to_many()
# 3. 测试一对一
one_to_one()
# 4. 测试多对多
many_to_many()
执行后,查询结果:

3.执行原生 SQL
虽然 ORM 很方便,但有时需要执行原生 SQL(如复杂查询、存储过程):
from sqlite3 import Row
from typing import List, Tuple, Optional, Sequence
from sqlalchemy import Integer, String, text
from sqlalchemy.engine import Result # 2.0+ 结果类型注解
from sqlalchemy.orm import Session, DeclarativeBase, Mapped, mapped_column
# 定义数据库引擎/会话
from sync_crud import get_db_session
from sync_session import sync_engine
# ====================== 基础配置 ======================
class Base(DeclarativeBase):
__abstract__ = True
# ====================== User 模型优化 ======================
class User(Base):
__tablename__ = "users"
# 2.0+ 新写法:Mapped + mapped_column(类型提示更精准)
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, index=True, comment="用户ID")
username: Mapped[str] = mapped_column(String(50), unique=True, nullable=False, index=True, comment="用户名")
email: Mapped[Optional[str]] = mapped_column(String(100), unique=True, index=True, comment="邮箱")
age: Mapped[int] = mapped_column(Integer, default=0, nullable=False, comment="年龄")
balance: Mapped[int] = mapped_column(Integer, default=0, nullable=False, comment="余额")
def __repr__(self) -> str:
return f"<User(id={self.id}, username='{self.username}', email='{self.email}', age={self.age}, balance={self.balance})>"
# ====================== 工具函数:初始化表和测试数据 ======================
def init_tables():
"""初始化关系表(删除旧表+创建新表)"""
db: Session = get_db_session()
try:
# 先删除旧表(按依赖顺序)
Base.metadata.drop_all(bind=sync_engine)
# 创建新表
Base.metadata.create_all(bind=sync_engine)
print("关系表初始化完成")
finally:
db.close()
# ====================== 核心:原生SQL操作(2.0+ 优化版) ======================
def execute_sql() -> None:
"""执行原生SQL操作"""
db: Session = get_db_session() # 获取会话(封装为可复用函数)
try:
# 1. 单条插入数据(获取插入后的自增ID)
single_insert_sql = text("""
INSERT INTO users (username, email, age, balance)
VALUES (:username, :email, :age, :balance)
""")
single_insert_data = {"username": "wushi", "email": "wushi@example.com", "age": 30, "balance": 100}
# MySQL 下获取自增ID(不同数据库语法不同)
if sync_engine.dialect.name == "mysql":
db.execute(single_insert_sql, single_insert_data)
last_id = db.execute(text("SELECT LAST_INSERT_ID()")).scalar()
db.commit()
print(f"单条插入完成,自增ID:{last_id}")
# 2. 批量插入数据(2.0+ 优化:executemany 模式,性能更高)
batch_insert_sql = text("""
INSERT INTO users (username, email, age, balance)
VALUES (:username, :email, :age, :balance)
""")
batch_data: List[dict] = [
{"username": "zhangsan", "email": "zhangsan@example.com", "age": 20, "balance": 100},
{"username": "lisi", "email": "lisi@example.com", "age": 22, "balance": 100},
{"username": "wangwu", "email": "wangwu@example.com", "age": 25, "balance": 100}
]
# 2.0+ 推荐使用 executemany_mode="values" 优化批量插入
result: Result = db.execute(
batch_insert_sql,
batch_data,
execution_options={"executemany_mode": "values"} # 批量插入优化
)
db.commit()
# 批量创建数据完成,影响行数:3
print(f"批量创建数据完成,影响行数:{result.rowcount}") # type: ignore[attr-defined]
# 3. 执行查询
query_sql = text("SELECT username, age FROM users WHERE age > :age")
query_result: Result = db.execute(query_sql, {"age": 20})
all_results: Sequence[Row] = query_result.fetchall()
print("原生 SQL 查询结果:", all_results) # 输出: [('lisi', 22), ('wangwu', 25)]
# 4. 执行单条查询(返回单个元组,2.0+ 空值处理优化)
single_sql = text("SELECT username FROM users WHERE id = :id")
single_result: Optional[Tuple[str]] = db.execute(single_sql, {"id": 1}).fetchone()
if single_result:
print("单个结果:", single_result[0]) # 输出: zhangsan
else:
print("未找到 ID 为 1 的用户")
# 5. 执行更新操作
update_sql = text("UPDATE users SET age = :age WHERE username = :username")
update_result: Result = db.execute(update_sql, {"age": 23, "username": "zhangsan"})
db.commit()
# 使用rowcount属性
update_row_count = update_result.rowcount # type: ignore[attr-defined]
print(f"原生 SQL 修改完成,影响行数:{update_row_count}") # 输出: 1
except Exception as e:
db.rollback() # 异常回滚,保证数据一致性
raise RuntimeError(f"原生SQL操作失败:{str(e)}") from e # 保留异常栈
finally:
db.close() # 确保会话关闭(2.0+ 同步会话必须显式关闭)
# ====================== 执行入口 ======================
if __name__ == "__main__":
# 1. 初始化表
init_tables()
# 2. 执行原生SQL操作
execute_sql()
4.事务处理
事务保证多个操作要么全部成功,要么全部失败,SQLAlchemy 会话默认开启事务:
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
# ====================== 核心:转账事务(2.0+ 优化版) ======================
def transfer_money(
db: Session,
from_username: str,
to_username: str,
amount: int
) -> Optional[tuple[bool, str]]:
"""
安全的转账事务操作(2.0+ 规范)
:param db: 数据库会话
:param from_username: 转出用户名
:param to_username: 转入用户名
:param amount: 转账金额(正数)
:return: (是否成功, 提示信息)
"""
# 前置校验:金额合法性
if amount <= 0:
return False, "转账金额必须大于0"
# 模拟异常
if amount == 100:
raise Exception("模拟异常")
try:
# 1. 2.0+ 推荐使用 select 构造器(替代旧 query())
stmt_a = select(User).where(User.username == from_username)
stmt_b = select(User).where(User.username == to_username)
# 2. 悲观锁:锁定行,避免并发修改(关键!防止超卖/余额不一致)
# for update:2.0+ 兼容,锁定查询到的行直到事务结束
user_a: Optional[User] = db.scalars(stmt_a.with_for_update()).first()
user_b: Optional[User] = db.scalars(stmt_b.with_for_update()).first()
# 3. 业务校验:用户存在性 + 余额充足性
if not user_a:
return False, f"转出用户 {from_username} 不存在"
if not user_b:
return False, f"转入用户 {to_username} 不存在"
if user_a.balance < amount:
return False, f"用户 {from_username} 余额不足(当前:{user_a.balance},需转出:{amount})"
# 4. 原子性更新操作
user_a.balance -= amount
user_b.balance += amount
# 5. 可选:批量更新(替代对象修改,性能更高)
# 适用于高并发场景,直接执行UPDATE语句,减少ORM对象操作
# stmt_update_a = update(User).where(User.username == from_username).values(balance=User.balance - amount)
# stmt_update_b = update(User).where(User.username == to_username).values(balance=User.balance + amount)
# db.execute(stmt_update_a)
# db.execute(stmt_update_b)
# 6. 提交事务(2.0+ 同步提交,无需await)
db.commit()
# 7. 刷新对象,获取最新数据(可选)
db.refresh(user_a)
db.refresh(user_b)
return True, (
f"转账成功!\n"
f"{from_username} 余额:{user_a.balance}(原:{user_a.balance + amount})\n"
f"{to_username} 余额:{user_b.balance}(原:{user_b.balance - amount})"
)
except SQLAlchemyError as e:
# 2.0+ 专用异常捕获:仅捕获数据库相关异常,避免捕获所有异常
db.rollback()
return False, f"转账失败(数据库异常):{str(e)}"
except Exception as e:
# 其他业务异常
db.rollback()
return False, f"转账失败(业务异常):{str(e)}"
finally:
# 可选:关闭会话(若会话是函数内创建,否则由调用方管理)
# db.close()
pass
# ====================== 执行入口 ======================
if __name__ == "__main__":
"""执行原生SQL操作"""
db: Session = get_db_session() # 获取会话(封装为可复用函数)
# 测试1:正常转账
success, msg = transfer_money(db, "zhangsan", "lisi", 10)
print("\n测试1 - 正常转账:", success, msg)
# 输出:
# 测试1 - 正常转账: True 转账成功!
# zhangsan 余额:90(原:100)
# lisi :110(原:100)
# 测试2:余额不足(触发回滚)
success, msg = transfer_money(db, "zhangsan", "lisi", 1000)
print("\n测试2 - 余额不足:", success, msg)
# 输出: 测试2 - 余额不足: False 用户 zhangsan 余额不足(当前:90,需转出:1000)
# 测试3:用户不存在(触发回滚)
success, msg = transfer_money(db, "alice", "lisi", 10)
print("\n测试3 - 用户不存在:", success, msg)
# 输出: 测试3 - 用户不存在: False 转出用户 alice 不存在
# 测试4:手动触发异常(模拟业务报错)
# 可取消注释测试:在transfer_money中添加 raise Exception("模拟异常")
# success, msg = transfer_money(db, "zhangsan", "lisi", 100)
# print("\n测试4 - 手动异常:", success, msg)
事务特性(ACID):
- 原子性(Atomicity):操作不可分割;
- 一致性(Consistency):数据状态一致;
- 隔离性(Isolation):多个事务互不干扰;
- 持久性(Durability):提交后数据永久保存。

5.优化查询
5.1.延迟加载 vs 立即加载
- 延迟加载(默认):访问关联对象时才查询数据库(N+1 查询问题);
- 立即加载(joinedload):通过 JOIN 一次性查询所有数据。
# ====================== 5.1 加载策略优化(解决N+1查询) ======================
def optimize_loading_strategy(db: Session):
"""2.0+ 加载策略优化(joinedload/selectinload)"""
print("=== 5.1 加载策略优化 ===")
# 问题:N+1查询(2.0+ 仍存在,需显式优化)
print("\n【N+1查询示例】")
users: List[User] = db.scalars(select(User)).all()
for user in users[:2]: # 仅演示前2个用户
# 访问articles时触发新查询(N+1问题)
print(f"用户 {user.username} 的文章数:{len(user.articles)}")
# 优化1:joinedload(左连接,一次性加载,适合一对一/一对多)
print("\n【joinedload优化(左连接)】")
stmt = select(User).options(joinedload(User.articles))
users = db.scalars(stmt).all()
for user in users[:2]:
# 无额外查询,直接访问关联数据
print(f"用户 {user.username} 的文章:{[art.title for art in user.articles[:1]]}")
# 优化2:selectinload(IN查询,适合多对多/大数据量一对多)
print("\n【selectinload优化(IN查询)】")
stmt = select(User).options(selectinload(User.roles))
users = db.scalars(stmt).all()
for user in users[:2]:
print(f"用户 {user.username} 的角色:{[role.name for role in user.roles]}")
# 扩展:2.0+ 高级加载策略
# 1. lazyload:强制延迟加载(覆盖模型默认)
stmt = select(User).options(lazyload(User.articles))
# 2. contains_eager:配合手动JOIN,复用已有连接
stmt = select(User).join(User.articles).options(contains_eager(User.articles))
5.2.只查询指定字段
避免查询所有字段,提升性能:
# ====================== 5.2 只查询指定字段 ======================
def optimize_selected_fields(db: Session):
"""字段筛选优化(减少数据传输)"""
# 基础写法:只查询指定字段(返回元组)
print("\n【基础字段筛选】")
stmt = select(User.username, User.email)
result: Result = db.execute(stmt)
user_fields: List[Tuple[str, str]] = result.all()
for username, email in user_fields[:3]:
print(f"用户名:{username},邮箱:{email}")
# 进阶:映射为字典(更易用)
print("\n【字段筛选+映射字典】")
stmt = select(
User.username.label("name"),
User.age.label("user_age")
).where(User.age > 18)
result = db.execute(stmt)
# 转为字典列表(2.0+ Result对象支持mappings())
user_dicts = [dict(row) for row in result.mappings()]
for user in user_dicts[:3]:
print(f"姓名:{user['name']},年龄:{user['user_age']}")
# 性能优化:count查询仅查主键(避免count(*))
print("\n【count优化】")
total = db.scalar(select(func.count(User.id))) # 比count(*)更快
print(f"总用户数:{total}")
5.3.使用索引
在频繁查询的字段上创建索引:
# User模型
class User(Base):
__tablename__ = "users"
# 基础字段 + 单字段索引
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, index=True)
username: Mapped[str] = mapped_column(
String(50), unique=True, nullable=False, index=True, comment="用户名(索引)"
)
email: Mapped[str] = mapped_column(
String(100), unique=True, nullable=False, index=True, comment="邮箱(唯一索引)"
)
age: Mapped[int] = mapped_column(Integer, default=0, comment="年龄")
is_active: Mapped[bool] = mapped_column(Boolean, default=True, comment="是否激活")
balance: Mapped[int] = mapped_column(Integer, default=0, comment="余额")
# 关联关系(默认lazy="select",即延迟加载)
articles: Mapped[List[Article]] = relationship("Article", back_populates="author")
roles: Mapped[List[Role]] = relationship("Role", secondary=user_role, back_populates="users")
# 2.0+ 复合索引(生产级优化:覆盖高频查询条件)
__table_args__ = (
# 复合索引:age + is_active(匹配 WHERE age > ? AND is_active = ?)
Index("idx_age_active", "age", "is_active"),
# 唯一索引(避免重复)
Index("idx_username_unique", "username", unique=True),
# 部分索引(仅对激活用户生效,SQLite不支持,MySQL/PG支持)
# Index("idx_active_email", "email", postgresql_where=(is_active == True)),
)
# ====================== 5.3 索引优化(2.0+ 规范) ======================
def index_optimization_demo():
"""索引优化说明(模型定义规范)"""
print("\n=== 5.3 索引优化 ===")
print("✅ 单字段索引:username/email/index=True(高频查询字段)")
print("✅ 复合索引:idx_age_active(匹配多字段查询条件)")
print("✅ 唯一索引:username/email(避免重复数据)")
print("✅ 部分索引:仅对激活用户生效(MySQL/PG支持,SQLite不支持)")
print("❌ 避免过度索引:更新频繁的字段(如balance)不建索引")
5.4.批量操作
避免循环单条操作,使用批量增删改:
# ====================== 5.4 批量操作(2.0+ 性能优化) ======================
def batch_operations_optimization(db: Session):
"""批量操作优化(减少数据库交互)"""
# 1. 批量插入
print("\n【批量插入】")
batch_users = [
User(username=f"user{i}", email=f"user{i}@example.com", age=18+i)
for i in range(1, 4)
]
db.add_all(batch_users)
db.commit()
print(f"批量插入 {len(batch_users)} 个用户完成")
# 2. 批量更新
print("\n【批量更新】")
stmt = update(User).where(User.age < 20).values(is_active=False)
result = db.execute(stmt)
db.commit()
print(f"批量更新 {result.rowcount} 条记录(年龄<20的用户设为非激活)")
# 3. 批量删除
print("\n【批量删除】")
stmt = delete(User).where(User.is_active == False)
result = db.execute(stmt)
db.commit()
print(f"批量删除 {result.rowcount} 条记录(非激活用户)")
# 批量插入优化(executemany_mode)
# 适用于MySQL,转为 VALUES (...), (...) 格式
# db.execute(
# insert(User),
# [{"username": "u1", "email": "u1@com"}, ...],
# execution_options={"executemany_mode": "values"}
# )
五、实战案例
1.博客系统
1.1.需求分析
实现一个简易博客系统,包含以下功能:
- 用户管理(注册、查询、修改、删除);
- 文章管理(发布、查询、修改、删除);
- 评论管理(新增、查询、删除);
- 角色权限(管理员 / 普通用户)。
项目结构:
D:\Workspaces\python\test\Test01\blog
│ blog.db
│ core.py
│ crud.py
│ __init__.py
1.2.数据模型设计
项目目录下,创建 core.py 文件:
import datetime
from typing import List, Optional, Generator
# SQLAlchemy 2.0 核心导入
from sqlalchemy import (
Boolean,
DateTime,
ForeignKey,
Integer,
String,
create_engine,
)
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import (
DeclarativeBase,
Mapped,
mapped_column,
relationship,
sessionmaker,
Session,
)
# --------------------------- 基础配置 ---------------------------
class Base(DeclarativeBase):
"""所有模型的基类"""
# 为所有模型添加通用的创建时间字段
__abstract__ = True # 抽象基类,不会创建表
id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
create_time: Mapped[datetime.datetime] = mapped_column(
DateTime, default=lambda: datetime.datetime.now(datetime.UTC), comment="创建时间"
)
# 数据库连接配置
DATABASE_URL = "sqlite:///./blog.db"
engine = create_engine(
DATABASE_URL,
# check_same_thread=False 是 SQLite 专属配置:
# 允许同一个数据库连接被多个线程共享使用,适配 SQLAlchemy 连接池的多线程场景;
# 解除「连接必须和创建它的线程绑定」的限制,避免 ProgrammingError 报错;
connect_args={
"check_same_thread": False, # 关闭 SQLite 的线程检查机制
"uri": True # 启用URI模式
},
# 或使用文件锁保证写安全
execution_options={"isolation_level": "SERIALIZABLE"},
echo=True, # 设为 True 可打印 SQL 语句,便于调试
)
# 创建会话工厂
SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine,
expire_on_commit=False, # 提交后不立即过期对象
)
# --------------------------- 数据模型 ---------------------------
class Role(Base):
"""角色模型"""
__tablename__ = "roles"
name: Mapped[str] = mapped_column(String(50), unique=True, index=True, comment="角色名称")
description: Mapped[Optional[str]] = mapped_column(String(200), nullable=True, comment="角色描述")
# 多对多关联用户
users: Mapped[List["User"]] = relationship(
"User", secondary="user_roles", back_populates="roles"
)
def __repr__(self) -> str:
return f"<Role(id={self.id}, name={self.name})>"
class UserRole(Base):
"""用户-角色 多对多中间表"""
__tablename__ = "user_roles"
__table_args__ = {"comment": "用户角色关联表"} # 表注释
# 复合主键
user_id: Mapped[int] = mapped_column(
ForeignKey("users.id"), primary_key=True, comment="用户ID"
)
role_id: Mapped[int] = mapped_column(
ForeignKey("roles.id"), primary_key=True, comment="角色ID"
)
class User(Base):
"""用户模型"""
__tablename__ = "users"
__table_args__ = {"comment": "用户表"}
username: Mapped[str] = mapped_column(String(50), unique=True, index=True, comment="用户名")
email: Mapped[str] = mapped_column(String(100), unique=True, index=True, comment="邮箱")
password: Mapped[str] = mapped_column(String(100), comment="密码(建议加密存储)")
is_active: Mapped[bool] = mapped_column(
Boolean, default=True, comment="是否激活"
)
age: Mapped[Optional[int]] = mapped_column(Integer, nullable=True, comment="年龄")
# 关联角色(多对多)
roles: Mapped[List[Role]] = relationship(
"Role", secondary="user_roles", back_populates="users"
)
# 关联文章(一对多)
articles: Mapped[List["Article"]] = relationship(
"Article", back_populates="author", cascade="all, delete-orphan"
)
# 关联评论(一对多)
comments: Mapped[List["Comment"]] = relationship(
"Comment", back_populates="author", cascade="all, delete-orphan"
)
def __repr__(self) -> str:
return f"<User(id={self.id}, username={self.username}, email={self.email})>"
class Article(Base):
"""文章模型"""
__tablename__ = "articles"
__table_args__ = {"comment": "文章表"}
title: Mapped[str] = mapped_column(String(100), index=True, comment="文章标题")
content: Mapped[str] = mapped_column(String, comment="文章内容")
read_count: Mapped[int] = mapped_column(
Integer, default=0, comment="阅读量"
)
user_id: Mapped[int] = mapped_column(
ForeignKey("users.id"), comment="作者ID"
)
# 关联作者
author: Mapped["User"] = relationship(
"User", back_populates="articles"
)
# 关联评论(一对多,删除文章时级联删除评论)
comments: Mapped[List["Comment"]] = relationship(
"Comment", back_populates="article", cascade="all, delete-orphan"
)
def __repr__(self) -> str:
return f"<Article(id={self.id}, title={self.title[:20]}...)>"
class Comment(Base):
"""评论模型"""
__tablename__ = "comments"
__table_args__ = {"comment": "评论表"}
content: Mapped[str] = mapped_column(String(500), comment="评论内容")
user_id: Mapped[int] = mapped_column(
ForeignKey("users.id"), comment="评论用户ID"
)
article_id: Mapped[int] = mapped_column(
ForeignKey("articles.id"), comment="关联文章ID"
)
# 关联用户
author: Mapped["User"] = relationship(
"User", back_populates="comments"
)
# 关联文章
article: Mapped["Article"] = relationship(
"Article", back_populates="comments"
)
def __repr__(self) -> str:
return f"<Comment(id={self.id}, content={self.content[:20]}...)>"
# --------------------------- 数据库操作工具函数 ---------------------------
def create_tables() -> None:
"""创建所有表"""
try:
Base.metadata.create_all(bind=engine)
print("数据库表创建成功!")
except SQLAlchemyError as ex:
print(f"创建表失败:{ex}")
raise
def get_db() -> Generator[Session, None, None]:
"""
获取数据库会话(依赖注入风格)
2.0 版本推荐使用 Session 类型提示
"""
db_session: Session = SessionLocal()
try:
yield db_session
except SQLAlchemyError as ex:
db_session.rollback() # 异常时回滚
print(f"数据库操作异常:{ex}")
raise
finally:
db_session.close()
# --------------------------- 测试代码 ---------------------------
if __name__ == "__main__":
# 创建所有表
create_tables()
# 测试代码
# try:
# # with语句自动管理会话,结束后自动关闭
# with SessionLocal() as db:
# print("数据库会话创建成功!")
#
# admin_role = db.query(Role).filter(Role.name == "admin").first()
# if not admin_role:
# admin_role = Role(name="admin", description="管理员")
# db.add(admin_role)
# db.commit()
# db.refresh(admin_role)
# print(f"添加测试角色成功:{admin_role}")
# else:
# print(f"admin角色已存在:{admin_role}")
# except Exception as e:
# print(f"测试过程出错:{e}")
执行上述代码,会自动创建 blog.db 数据库文件,查询生成的表结构:

1.3.初始化数据(SQL 脚本)
-- 插入角色
INSERT INTO roles (name, description) VALUES
('admin', '系统管理员,拥有所有权限'),
('user', '普通用户,只能操作自己的内容');
-- 插入管理员用户(密码:123456,实际项目需加密)
INSERT INTO users (username, email, password, is_active) VALUES
('admin', 'admin@example.com', '123456', 1);
-- 给管理员分配角色
INSERT INTO user_role (user_id, role_id) VALUES (1, 1);
执行后,查询生成的表数据:

1.4.核心功能实现
项目目录下,创建 crud.py 文件:
import os
import sys
from dataclasses import dataclass
from typing import Dict, Optional, Any, Sequence
# 添加项目根目录到 Python 路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# SQLAlchemy 2.0 核心导入
from sqlalchemy import (
select,
func,
delete,
update,
or_
)
from sqlalchemy.engine import Result
from sqlalchemy.exc import (
SQLAlchemyError,
IntegrityError
)
from sqlalchemy.orm import (
Session,
joinedload,
selectinload
)
# 导入模型和数据库会话
from blog.core import User, Role, Article, Comment, get_db
# --------------------------- 数据结构定义 ---------------------------
@dataclass
class PaginationResult:
"""分页结果数据结构"""
items: Sequence[Any]
total: int
page: int
page_size: int
total_pages: int
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
"items": [item.__dict__ for item in self.items], # 可替换为自定义序列化逻辑
"total": self.total,
"page": self.page,
"page_size": self.page_size,
"total_pages": self.total_pages
}
# --------------------------- 异常定义 ---------------------------
class BusinessException(Exception):
"""业务异常基类"""
def __init__(self, message: str, code: int = 400):
self.message = message
self.code = code
super().__init__(self.message)
class ResourceExistsException(BusinessException):
"""资源已存在异常"""
def __init__(self, message: str):
super().__init__(message, 409)
class ResourceNotFoundException(BusinessException):
"""资源不存在异常"""
def __init__(self, message: str):
super().__init__(message, 404)
class PermissionDeniedException(BusinessException):
"""权限不足异常"""
def __init__(self, message: str):
super().__init__(message, 403)
# --------------------------- 通用工具函数 ---------------------------
def commit_with_retry(db: Session, max_retries: int = 3) -> None:
"""
提交事务并支持重试(处理并发冲突)
"""
retries = 0
while retries < max_retries:
try:
db.commit()
return
except SQLAlchemyError as ex:
retries += 1
db.rollback()
if retries >= max_retries:
raise BusinessException(f"数据库操作失败:{str(ex)}")
def get_password_hash(password: str) -> str:
"""
密码加密(生产环境必需)
"""
return password
# --------------------------- 用户管理 ---------------------------
def create_user(
db: Session,
username: str,
email: str,
password: str,
age: Optional[int] = None
) -> User:
"""
创建用户(SQLAlchemy 2.0 优化版)
Args:
db: 数据库会话
username: 用户名
email: 邮箱
password: 原始密码(自动加密)
age: 年龄
Returns:
创建的用户对象
Raises:
ResourceExistsException: 用户名/邮箱已存在
BusinessException: 数据库操作失败
"""
# 2.0 优化:一次查询检查用户名和邮箱,减少数据库交互
stmt = select(User).where(
or_(User.username == username, User.email == email)
)
existing_user: Optional[User] = db.execute(stmt).scalar_one_or_none()
if existing_user:
if existing_user.username == username:
raise ResourceExistsException("用户名已存在")
else:
raise ResourceExistsException("邮箱已存在")
# 密码加密(生产环境必需)
hashed_password = get_password_hash(password)
# 创建用户
new_user = User(
username=username,
email=email,
password=hashed_password,
is_active=True,
age=age
)
# 分配普通用户角色(优化:使用 selectinload 减少N+1查询)
user_role: Optional[Role] = db.execute(
select(Role).where(Role.name == "user")
).scalar_one_or_none()
if user_role:
new_user.roles.append(user_role)
try:
db.add(new_user)
commit_with_retry(db)
db.refresh(new_user)
# 刷新后加载角色信息
db.execute(select(User).options(joinedload(User.roles)).where(User.id == new_user.id))
return new_user
except SQLAlchemyError as ex:
db.rollback()
raise BusinessException(f"创建用户失败:{str(ex)}")
def get_user_by_username(db: Session, username: str) -> Optional[User]:
"""
根据用户名查询用户(包含角色)
Args:
db: 数据库会话
username: 用户名
Returns:
用户对象或None
"""
stmt = select(User).options(
selectinload(User.roles) # 2.0 推荐:selectinload 性能优于 joinedload 多对多
).where(User.username == username)
return db.execute(stmt).scalar_one_or_none()
def get_user_by_id(db: Session, user_id: int) -> Optional[User]:
"""
根据ID查询用户(包含角色)
Args:
db: 数据库会话
user_id: 用户ID
Returns:
用户对象或None
"""
stmt = select(User).options(
selectinload(User.roles)
).where(User.id == user_id)
return db.execute(stmt).scalar_one_or_none()
def update_user(
db: Session,
user_id: int,
**kwargs
) -> User:
"""
修改用户信息
Args:
db: 数据库会话
user_id: 用户ID
**kwargs: 要更新的字段(如 email, age, is_active 等)
Returns:
更新后的用户对象
Raises:
ResourceNotFoundException: 用户不存在
BusinessException: 更新失败
"""
# 检查用户是否存在
db_user = get_user_by_id(db, user_id)
if not db_user:
raise ResourceNotFoundException("用户不存在")
# 过滤合法字段,防止更新敏感字段
allowed_fields = {"username", "email", "age", "is_active"}
update_fields = {k: v for k, v in kwargs.items() if k in allowed_fields}
if not update_fields:
return db_user
# 2.0 优化:批量更新字段
try:
for key, value in update_fields.items():
setattr(db_user, key, value)
commit_with_retry(db)
db.refresh(db_user)
return db_user
except IntegrityError as ex:
db.rollback()
raise ResourceExistsException(f"更新失败:{str(ex)}")
except SQLAlchemyError as ex:
db.rollback()
raise BusinessException(f"更新用户失败:{str(ex)}")
def delete_user(db: Session, user_id: int) -> bool:
"""
删除用户(2.0 批量删除写法,无需先查询)
Args:
db: 数据库会话
user_id: 用户ID
Returns:
删除成功返回True
Raises:
ResourceNotFoundException: 用户不存在
"""
stmt = delete(User).where(User.id == user_id)
result: Result = db.execute(stmt)
if result.rowcount == 0: # type: ignore[attr-defined]
raise ResourceNotFoundException("用户不存在")
commit_with_retry(db)
return True
# --------------------------- 文章管理 ---------------------------
def create_article(
db: Session,
title: str,
content: str,
user_id: int
) -> Article:
"""
发布文章
Args:
db: 数据库会话
title: 文章标题
content: 文章内容
user_id: 作者ID
Returns:
文章对象
Raises:
ResourceNotFoundException: 用户不存在
BusinessException: 创建失败
"""
# 检查用户是否存在
if not db.execute(select(User.id).where(User.id == user_id)).scalar():
raise ResourceNotFoundException("用户不存在")
new_article = Article(
title=title,
content=content,
user_id=user_id
)
try:
db.add(new_article)
commit_with_retry(db)
db.refresh(new_article)
return new_article
except SQLAlchemyError as ex:
db.rollback()
raise BusinessException(f"发布文章失败:{str(ex)}")
def get_article_list(
db: Session,
page: int = 1,
page_size: int = 10
) -> PaginationResult:
"""
分页查询文章列表(包含作者)
Args:
db: 数据库会话
page: 页码(默认1)
page_size: 每页条数(默认10)
Returns:
分页结果对象
"""
# 参数校验
page = max(1, page)
page_size = max(1, min(100, page_size)) # 限制最大页大小
offset = (page - 1) * page_size
# 2.0 优化:使用 with_for_update(read=True) 避免脏读(可选)
# 查询文章列表(优化:只加载需要的字段)
stmt = select(Article).options(
joinedload(Article.author).load_only(User.id, User.username, User.email)
).order_by(Article.create_time.desc()).offset(offset).limit(page_size)
articles: Sequence[Article] = db.execute(stmt).scalars().all()
# 总数量(优化:避免子查询)
total: int = db.execute(select(func.count(Article.id))).scalar()
total_pages = (total + page_size - 1) // page_size
return PaginationResult(
items=articles,
total=total,
page=page,
page_size=page_size,
total_pages=total_pages
)
def get_article_detail(db: Session, article_id: int) -> Article:
"""
查询文章详情(包含作者、评论及评论作者)
Args:
db: 数据库会话
article_id: 文章ID
Returns:
文章对象
Raises:
ResourceNotFoundException: 文章不存在
"""
stmt = select(Article).options(
joinedload(Article.author).load_only(User.id, User.username),
joinedload(Article.comments).joinedload(
Comment.author
).load_only(User.id, User.username)
).where(Article.id == article_id)
# 使用 unique() 方法处理连接集合导致的重复数据
db_article: Optional[Article] = db.execute(stmt).unique().scalar_one_or_none()
if not db_article:
raise ResourceNotFoundException("文章不存在")
# 阅读量+1(2.0 优化:使用 update 语句,无需查询后修改)
db.execute(
update(Article)
.where(Article.id == article_id)
.values(read_count=Article.read_count + 1)
)
commit_with_retry(db)
# 刷新阅读量
db.refresh(db_article, attribute_names=["read_count"])
return db_article
def update_article(
db: Session,
article_id: int,
user_id: int,
**kwargs
) -> Article:
"""
修改文章(仅作者可修改)
Args:
db: 数据库会话
article_id: 文章ID
user_id: 操作人ID
**kwargs: 要更新的字段(title, content)
Returns:
更新后的文章对象
Raises:
ResourceNotFoundException: 文章/用户不存在
PermissionDeniedException: 无权限
"""
# 检查文章是否存在并验证权限
stmt = select(Article).where(Article.id == article_id)
db_article: Optional[Article] = db.execute(stmt).scalar_one_or_none()
if not db_article:
raise ResourceNotFoundException("文章不存在")
if db_article.user_id != user_id:
raise PermissionDeniedException("无权限修改该文章")
# 过滤合法字段
allowed_fields = {"title", "content"}
update_fields = {k: v for k, v in kwargs.items() if k in allowed_fields}
if not update_fields:
return db_article
try:
for key, value in update_fields.items():
setattr(db_article, key, value)
commit_with_retry(db)
db.refresh(db_article)
return db_article
except SQLAlchemyError as ex:
db.rollback()
raise BusinessException(f"修改文章失败:{str(ex)}")
def delete_article(
db: Session,
article_id: int,
user_id: int
) -> bool:
"""
删除文章(仅作者/管理员可删除)
Args:
db: 数据库会话
article_id: 文章ID
user_id: 操作人ID
Returns:
删除成功返回True
Raises:
ResourceNotFoundException: 文章/用户不存在
PermissionDeniedException: 无权限
"""
# 检查文章是否存在
db_article: Optional[Article] = db.execute(
select(Article).where(Article.id == article_id)
).scalar_one_or_none()
if not db_article:
raise ResourceNotFoundException("文章不存在")
# 检查用户和权限
db_user = get_user_by_id(db, user_id)
if not db_user:
raise ResourceNotFoundException("操作用户不存在")
is_admin = any(role.name == "admin" for role in db_user.roles)
if db_article.user_id != user_id and not is_admin:
raise PermissionDeniedException("无权限删除该文章")
try:
db.delete(db_article)
commit_with_retry(db)
return True
except SQLAlchemyError as ex:
db.rollback()
raise BusinessException(f"删除文章失败:{str(ex)}")
# --------------------------- 评论管理 ---------------------------
def create_comment(
db: Session,
content: str,
user_id: int,
article_id: int
) -> Comment:
"""
新增评论
Args:
db: 数据库会话
content: 评论内容
user_id: 评论者ID
article_id: 文章ID
Returns:
评论对象
Raises:
ResourceNotFoundException: 用户/文章不存在
"""
# 2.0 优化:一次查询检查用户和文章
user_exists = db.execute(select(User.id).where(User.id == user_id)).scalar()
article_exists = db.execute(select(Article.id).where(Article.id == article_id)).scalar()
if not user_exists:
raise ResourceNotFoundException("用户不存在")
if not article_exists:
raise ResourceNotFoundException("文章不存在")
db_comment = Comment(
content=content,
user_id=user_id,
article_id=article_id
)
try:
db.add(db_comment)
commit_with_retry(db)
db.refresh(db_comment)
return db_comment
except SQLAlchemyError as ex:
db.rollback()
raise BusinessException(f"新增评论失败:{str(ex)}")
def delete_comment(
db: Session,
comment_id: int,
user_id: int
) -> bool:
"""
删除评论(仅评论者/管理员可删除)
Args:
db: 数据库会话
comment_id: 评论ID
user_id: 操作人ID
Returns:
删除成功返回True
Raises:
ResourceNotFoundException: 评论/用户不存在
PermissionDeniedException: 无权限
"""
# 检查评论是否存在
db_comment: Optional[Comment] = db.execute(
select(Comment).where(Comment.id == comment_id)
).scalar_one_or_none()
if not db_comment:
raise ResourceNotFoundException("评论不存在")
# 检查用户和权限
db_user = get_user_by_id(db, user_id)
if not db_user:
raise ResourceNotFoundException("操作用户不存在")
is_admin = any(role.name == "admin" for role in db_user.roles)
if db_comment.user_id != user_id and not is_admin:
raise PermissionDeniedException("无权限删除该评论")
try:
db.delete(db_comment)
commit_with_retry(db)
return True
except SQLAlchemyError as ex:
db.rollback()
raise BusinessException(f"删除评论失败:{str(ex)}")
# --------------------------- 测试功能 ---------------------------
if __name__ == "__main__":
# 获取数据库会话
db_session: Session = next(get_db())
try:
# 1. 初始化角色
roles = db_session.execute(select(Role).where(Role.name.in_(["admin", "guest"]))).scalars().all()
role_names = {role.name for role in roles}
missing_roles = []
if "admin" not in role_names:
missing_roles.append(Role(name="admin", description="系统管理员"))
if "guest" not in role_names:
missing_roles.append(Role(name="guest", description="来宾用户"))
if missing_roles:
db_session.add_all(missing_roles)
commit_with_retry(db_session)
print("初始化角色成功")
# 2. 创建测试用户
try:
user = create_user(
db_session,
username="test_user",
email="test@example.com",
password="123456",
age=23
)
print(f"创建用户成功:{user.username} (ID: {user.id})") # 输出: 创建用户成功:test_user (ID: 1)
except ResourceExistsException as e:
print(f"创建用户失败:{e.message}")
# 获取已存在的用户
user = get_user_by_username(db_session, "test_user")
if not user:
raise
# 3. 发布测试文章
article = create_article(
db_session,
title="我的第一篇博客",
content="SQLAlchemy 2.0 实战案例",
user_id=user.id
)
print(f"发布文章成功:{article.title} (ID: {article.id})") # 输出: 发布文章成功:我的第一篇博客 (ID: 1)
# 4. 新增评论
comment = create_comment(
db_session,
content="这篇文章写得很好!",
user_id=user.id,
article_id=article.id
)
print(f"新增评论成功:{comment.content} (ID: {comment.id})") # 输出: 新增评论成功:这篇文章写得很好! (ID: 1)
# 5. 查询文章详情
detail = get_article_detail(db_session, article.id)
print("\n=== 文章详情 ===")
print(f"标题:{detail.title}") # 输出: 标题:我的第一篇博客
print(f"作者:{detail.author.username}") # 输出: 作者:test_user
print(f"阅读量:{detail.read_count}") # 输出: 阅读量:1
print(f"评论数:{len(detail.comments)}") # 输出: 评论数:1
print(f"第一条评论:{detail.comments[0].content}") # 输出: 第一条评论:这篇文章写得很好!
print(f"评论作者:{detail.comments[0].author.username}") # 输出: 评论作者:test_user
# 6. 测试分页查询
pagination = get_article_list(db_session, page=1, page_size=10)
print(f"\n=== 分页查询 ===")
print(f"总文章数:{pagination.total}") # 输出: 总文章数:1
print(f"当前页:{pagination.page}/{pagination.total_pages}") # 输出: 当前页:1/1
except BusinessException as e:
print(f"业务异常:{e.message} (代码:{e.code})")
except Exception as e:
print(f"测试失败:{str(e)}")
db_session.rollback()
finally:
# 关闭会话
db_session.close()
print("\n数据库会话已关闭")
执行后,会创建用户、发布文章、新增评论,并查询详情:
2.电商系统
2.1.案例背景
核心业务模型:
- 用户 (User):系统用户,可创建订单
- 商品 (Product):商品信息,包含分类、库存、价格等
- 订单 (Order):用户创建的订单,包含多个订单项
- 订单项 (OrderItem):订单中的商品条目(关联订单和商品)
- 商品分类 (Category):商品分类(一对多)
项目结构:
D:\Workspaces\python\test\Test01\mall
│ core.py
│ crud.py
│ mall.db
│ test.py
│ __init__.py
2.2.完整代码实现
2.2.1.核心模型与数据库配置
项目目录下,创建 core.py 文件:
# -*- coding: utf-8 -*-
"""
电商订单管理系统 - 核心配置与数据模型
========================================
功能:
1. 定义SQLAlchemy 2.0的基础模型(DeclarativeBase)
2. 配置数据库连接与会话工厂
3. 定义业务数据模型(用户、商品、分类、订单、订单项)
4. 实现通用分页数据结构
依赖:
- SQLAlchemy 2.0.48
- Python 3.13.11
创建时间:2026-03-22
作者:inuex
版本:v1.0
"""
import datetime
from dataclasses import dataclass
from typing import List, Optional, Dict, Any, Generator, Sequence
from sqlalchemy import (
create_engine,
ForeignKey,
Integer,
String,
Float,
Boolean,
DateTime,
Text,
)
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import (
DeclarativeBase,
Session,
sessionmaker,
Mapped,
mapped_column,
relationship,
)
# --------------------------- 基础配置 ---------------------------
class Base(DeclarativeBase):
"""所有模型的抽象基类"""
__abstract__ = True
id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
create_time: Mapped[datetime.datetime] = mapped_column(
DateTime, default=datetime.datetime.now(datetime.UTC), comment="创建时间"
)
update_time: Mapped[datetime.datetime] = mapped_column(
DateTime, default=datetime.datetime.now(datetime.UTC), onupdate=datetime.datetime.now(datetime.UTC),
comment="更新时间"
)
is_deleted: Mapped[bool] = mapped_column(
Boolean, default=False, comment="是否删除(软删除)"
)
# 数据库连接(SQLite,可替换为MySQL/PostgreSQL)
DATABASE_URL = "sqlite:///./mall.db"
engine = create_engine(
DATABASE_URL,
connect_args={"check_same_thread": False},
echo=False, # 设为True可打印SQL语句,便于学习
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# --------------------------- 数据模型 ---------------------------
class User(Base):
"""用户模型"""
__tablename__ = "users"
username: Mapped[str] = mapped_column(String(50), unique=True, index=True, comment="用户名")
phone: Mapped[str] = mapped_column(String(20), unique=True, index=True, comment="手机号")
email: Mapped[str] = mapped_column(String(100), nullable=True, comment="邮箱")
address: Mapped[str] = mapped_column(String(200), nullable=True, comment="收货地址")
balance: Mapped[float] = mapped_column(Float, default=0.0, comment="账户余额")
# 关联订单(一对多)
orders: Mapped[List["Order"]] = relationship(
"Order", back_populates="user", cascade="all, delete-orphan"
)
def __repr__(self) -> str:
return f"<User(id={self.id}, username={self.username}, balance={self.balance})>"
class Category(Base):
"""商品分类模型"""
__tablename__ = "categories"
name: Mapped[str] = mapped_column(String(50), unique=True, comment="分类名称")
description: Mapped[Optional[str]] = mapped_column(Text, nullable=True, comment="分类描述")
# 关联商品(一对多)
products: Mapped[List["Product"]] = relationship(
"Product", back_populates="category", cascade="all, delete-orphan"
)
def __repr__(self) -> str:
return f"<Category(id={self.id}, name={self.name})>"
class Product(Base):
"""商品模型"""
__tablename__ = "products"
name: Mapped[str] = mapped_column(String(100), index=True, comment="商品名称")
price: Mapped[float] = mapped_column(Float, comment="商品价格")
stock: Mapped[int] = mapped_column(Integer, default=0, comment="库存数量")
sales: Mapped[int] = mapped_column(Integer, default=0, comment="销量")
description: Mapped[Optional[str]] = mapped_column(Text, nullable=True, comment="商品描述")
category_id: Mapped[int] = mapped_column(ForeignKey("categories.id"), comment="分类ID")
# 关联分类
category: Mapped["Category"] = relationship("Category", back_populates="products")
# 关联订单项
order_items: Mapped[List["OrderItem"]] = relationship(
"OrderItem", back_populates="product", cascade="all, delete-orphan"
)
def __repr__(self) -> str:
return f"<Product(id={self.id}, name={self.name}, price={self.price}, stock={self.stock})>"
class Order(Base):
"""订单模型"""
__tablename__ = "orders"
order_no: Mapped[str] = mapped_column(String(32), unique=True, index=True, comment="订单编号")
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), comment="用户ID")
total_amount: Mapped[float] = mapped_column(Float, default=0.0, comment="订单总金额")
status: Mapped[str] = mapped_column(String(20), default="pending",
comment="订单状态:pending/paid/shipped/completed/cancelled")
payment_time: Mapped[Optional[datetime.datetime]] = mapped_column(DateTime, nullable=True, comment="支付时间")
# 关联用户
user: Mapped["User"] = relationship("User", back_populates="orders")
# 关联订单项
items: Mapped[List["OrderItem"]] = relationship(
"OrderItem", back_populates="order", cascade="all, delete-orphan"
)
def __repr__(self) -> str:
return f"<Order(id={self.id}, order_no={self.order_no}, status={self.status}, total_amount={self.total_amount})>"
class OrderItem(Base):
"""订单项模型(订单-商品 多对多关联)"""
__tablename__ = "order_items"
order_id: Mapped[int] = mapped_column(ForeignKey("orders.id"), comment="订单ID")
product_id: Mapped[int] = mapped_column(ForeignKey("products.id"), comment="商品ID")
quantity: Mapped[int] = mapped_column(Integer, default=1, comment="购买数量")
unit_price: Mapped[float] = mapped_column(Float, comment="购买时的单价")
# 关联订单
order: Mapped["Order"] = relationship("Order", back_populates="items")
# 关联商品
product: Mapped["Product"] = relationship("Product", back_populates="order_items")
def __repr__(self) -> str:
return f"<OrderItem(id={self.id}, order_id={self.order_id}, product_id={self.product_id}, quantity={self.quantity})>"
# --------------------------- 分页数据结构 ---------------------------
@dataclass
class Pagination:
"""通用分页返回结构"""
items: Sequence[Any]
total: int
page: int
page_size: int
total_pages: int
has_next: bool
has_prev: bool
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
"items": [i.__dict__ for i in self.items],
"pagination": {
"total": self.total,
"page": self.page,
"page_size": self.page_size,
"total_pages": self.total_pages,
"has_next": self.has_next,
"has_prev": self.has_prev
}
}
# --------------------------- 核心业务逻辑 ---------------------------
def create_tables() -> None:
"""创建所有表"""
Base.metadata.create_all(bind=engine)
print("数据库表创建成功!")
def get_db() -> Generator[Session, None, None]:
"""获取数据库会话"""
db_session: Session = SessionLocal()
try:
yield db_session
except SQLAlchemyError as ex:
db_session.rollback()
raise ex
finally:
db_session.close()
2.2.2.数据操作层(CRUD)
项目目录下,创建 crud.py 文件:
# -*- coding: utf-8 -*-
"""
电商订单管理系统 - 数据操作层(CRUD)
=====================================
功能:
1. 实现商品、订单、用户等核心业务的增删改查
2. 封装分页查询、条件过滤、排序等通用查询逻辑
3. 处理电商核心业务逻辑(订单创建、库存扣减、状态更新)
4. 实现事务管理与异常处理
依赖:
- core.py(数据模型与数据库配置)
- SQLAlchemy 2.0.48
创建时间:2026-03-22
作者:inuex
版本:v1.0
注意:
1. 所有数据库操作需通过Session会话执行,禁止直接操作数据库连接
2. 订单创建包含事务逻辑,异常时自动回滚
3. 分页查询默认最大页大小为50,防止大数据量查询
"""
import datetime
from typing import List, Tuple, Sequence
from sqlalchemy import (
select,
func,
and_,
desc,
asc,
)
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import (
Session,
joinedload,
selectinload,
)
from mall.core import Category, Product, User, Pagination, Order, OrderItem
# ====================== 商品管理 ======================
def create_product(
db: Session,
name: str,
price: float,
category_id: int,
stock: int = 0,
description: str = None
) -> Product:
"""创建商品"""
# 检查分类是否存在
category = db.execute(select(Category).where(Category.id == category_id)).scalar_one_or_none()
if not category:
raise ValueError(f"分类ID {category_id} 不存在")
db_product = Product(
name=name,
price=price,
stock=stock,
description=description,
category_id=category_id
)
db.add(db_product)
db.commit()
db.refresh(db_product)
return db_product
def get_product_list(
db: Session,
page: int = 1,
page_size: int = 10,
category_id: int = None,
min_price: float = None,
max_price: float = None,
keyword: str = None,
sort_by: str = "create_time",
sort_order: str = "desc"
) -> Pagination:
"""
分页查询商品列表(支持多条件过滤、排序)
:param db: 数据库会话
:param page: 页码
:param page_size: 每页条数
:param category_id: 分类ID(可选)
:param min_price: 最低价格(可选)
:param max_price: 最高价格(可选)
:param keyword: 商品名称关键词(可选)
:param sort_by: 排序字段(create_time/price/stock/sales)
:param sort_order: 排序方向(asc/desc)
:return: 分页结果
"""
# 参数校验
page = max(1, page)
page_size = max(1, min(50, page_size)) # 限制最大页大小
offset = (page - 1) * page_size
# 构建查询条件
conditions = [Product.is_deleted == False]
if category_id:
conditions.append(Product.category_id == category_id)
if min_price is not None:
conditions.append(Product.price >= min_price)
if max_price is not None:
conditions.append(Product.price <= max_price)
if keyword:
conditions.append(Product.name.like(f"%{keyword}%"))
# 构建排序规则
sort_column = getattr(Product, sort_by, Product.create_time)
sort_func = desc if sort_order.lower() == "desc" else asc
# 构建查询
stmt = select(Product).options(
joinedload(Product.category).load_only(Category.id, Category.name) # 关联加载分类(只加载必要字段)
).where(and_(*conditions)).order_by(sort_func(sort_column)).offset(offset).limit(page_size)
# 执行查询
products: Sequence[Product] = db.execute(stmt).scalars().all()
# 查询总数(复用条件,不包含分页和排序)
count_stmt = select(func.count(Product.id)).where(and_(*conditions))
total = db.execute(count_stmt).scalar()
# 计算分页信息
total_pages = (total + page_size - 1) // page_size
has_next = page < total_pages
has_prev = page > 1
return Pagination(
items=products,
total=total,
page=page,
page_size=page_size,
total_pages=total_pages,
has_next=has_next,
has_prev=has_prev
)
# ====================== 订单管理 ======================
def generate_order_no() -> str:
"""生成唯一订单编号"""
return datetime.datetime.now().strftime("%Y%m%d%H%M%S") + str(datetime.datetime.now().microsecond)[:4]
def create_order(
db: Session,
user_id: int,
product_items: List[Tuple[int, int]] # [(product_id, quantity), ...]
) -> Order:
"""
创建订单(包含事务管理、库存检查、金额计算)
:param db: 数据库会话
:param user_id: 用户ID
:param product_items: 商品列表 [(商品ID, 数量), ...]
:return: 创建的订单
"""
if not product_items:
raise ValueError("订单中至少包含一个商品")
# 1. 检查用户是否存在
user = db.execute(select(User).where(User.id == user_id)).scalar_one_or_none()
if not user:
raise ValueError(f"用户ID {user_id} 不存在")
# 2. 检查商品库存并计算总金额
total_amount = 0.0
order_items = []
for product_id, quantity in product_items:
if quantity <= 0:
raise ValueError(f"商品ID {product_id} 购买数量必须大于0")
# 加锁查询商品(防止超卖)
product = db.execute(
select(Product).where(and_(Product.id == product_id, Product.is_deleted == False)).with_for_update()
).scalar_one_or_none()
if not product:
raise ValueError(f"商品ID {product_id} 不存在或已删除")
if product.stock < quantity:
raise ValueError(f"商品 {product.name} 库存不足(当前库存:{product.stock},请求数量:{quantity})")
# 计算金额
item_amount = product.price * quantity
total_amount += item_amount
# 创建订单项
order_items.append(OrderItem(
product_id=product_id,
quantity=quantity,
unit_price=product.price
))
# 扣减库存,增加销量
product.stock -= quantity
product.sales += quantity
# 3. 创建订单
order = Order(
order_no=generate_order_no(),
user_id=user_id,
total_amount=total_amount,
status="pending",
items=order_items
)
try:
db.add(order)
db.commit()
db.refresh(order)
# 关联加载订单项和商品信息
db.execute(
select(Order).options(
joinedload(Order.items).joinedload(OrderItem.product),
joinedload(Order.user)
).where(Order.id == order.id)
)
return order
except SQLAlchemyError as ex:
db.rollback()
raise ex
def get_order_list(
db: Session,
page: int = 1,
page_size: int = 10,
user_id: int = None,
status: str = None,
start_time: datetime.datetime = None,
end_time: datetime.datetime = None
) -> Pagination:
"""
分页查询订单列表(支持多条件过滤)
"""
page = max(1, page)
page_size = max(1, min(50, page_size))
offset = (page - 1) * page_size
# 构建条件
conditions = [Order.is_deleted == False]
if user_id:
conditions.append(Order.user_id == user_id)
if status:
conditions.append(Order.status == status)
if start_time:
conditions.append(Order.create_time >= start_time)
if end_time:
conditions.append(Order.create_time <= end_time)
# 构建查询(关联加载用户和订单项)
stmt = select(Order).options(
joinedload(Order.user).load_only(User.id, User.username, User.phone),
selectinload(Order.items).joinedload(OrderItem.product).load_only(Product.id, Product.name, Product.price)
).where(and_(*conditions)).order_by(desc(Order.create_time)).offset(offset).limit(page_size)
orders = db.execute(stmt).scalars().all()
# 查询总数
count_stmt = select(func.count(Order.id)).where(and_(*conditions))
total = db.execute(count_stmt).scalar()
total_pages = (total + page_size - 1) // page_size
has_next = page < total_pages
has_prev = page > 1
return Pagination(
items=orders,
total=total,
page=page,
page_size=page_size,
total_pages=total_pages,
has_next=has_next,
has_prev=has_prev
)
def update_order_status(
db: Session,
order_id: int,
status: str,
payment_time: datetime.datetime = None
) -> Order:
"""
更新订单状态
:param db: 数据库会话
:param order_id: 订单ID
:param status: 新状态
:param payment_time: 支付时间(仅status=paid时需要)
:return: 更新后的订单
"""
valid_status = ["pending", "paid", "shipped", "completed", "cancelled"]
if status not in valid_status:
raise ValueError(f"无效的订单状态,可选值:{valid_status}")
db_order = db.execute(select(Order).where(Order.id == order_id)).scalar_one_or_none()
if not db_order:
raise ValueError(f"订单ID {order_id} 不存在")
# 如果是取消订单,恢复库存
if status == "cancelled" and db_order.status != "cancelled":
for i in db_order.items:
db_product = db.execute(select(Product).where(Product.id == i.product_id)).scalar()
if db_product:
db_product.stock += i.quantity
db_product.sales -= i.quantity
# 更新订单状态
db_order.status = status
if status == "paid" and payment_time:
db_order.payment_time = payment_time
db.commit()
db.refresh(db_order)
return db_order
2.2.3.功能测试脚本
项目目录下,创建 test.py 文件:
# -*- coding: utf-8 -*-
"""
电商订单管理系统 - 功能测试脚本
=================================
功能:
1. 验证数据库表创建
2. 测试核心CRUD功能(商品创建、订单生成、状态更新)
3. 验证分页查询、条件过滤、事务回滚等高级特性
4. 模拟电商业务流程(用户下单→库存扣减→订单状态更新)
测试场景:
- 商品分类初始化
- 商品创建与分页查询
- 订单创建(含库存检查)
- 订单状态更新与库存恢复(取消订单)
- 多条件过滤的商品查询
依赖:
- core.py + crud.py(核心业务层)
- SQLAlchemy 2.0.48
创建时间:2026-03-22
作者:inuex
版本:v1.0
运行方式:
python -m mall.test
"""
import datetime
from sqlalchemy import (
select,
)
from sqlalchemy.exc import SQLAlchemyError
from mall.core import create_tables, Category, Product, User, get_db
from mall.crud import create_product, create_order, get_product_list, update_order_status, get_order_list
# --------------------------- 测试代码 ---------------------------
def main():
# 获取数据库会话
db = next(get_db())
try:
# 1. 初始化分类
electronics = db.execute(select(Category).where(Category.name == "电子产品")).scalar_one_or_none()
if not electronics:
electronics = Category(name="电子产品", description="手机、电脑、平板等")
db.add(electronics)
db.commit()
# 2. 创建测试商品
iphone = db.execute(select(Product).where(Product.name == "iPhone 15")).scalar_one_or_none()
if not iphone:
iphone = create_product(
db,
name="iPhone 15",
price=5999.0,
category_id=electronics.id,
stock=100,
description="苹果15手机,128G版本"
)
macbook = db.execute(select(Product).where(Product.name == "MacBook Pro")).scalar_one_or_none()
if not macbook:
macbook = create_product(
db,
name="MacBook Pro",
price=12999.0,
category_id=electronics.id,
stock=50,
description="苹果笔记本电脑,M3芯片"
)
# 3. 创建测试用户
test_user = db.execute(select(User).where(User.username == "test_buyer")).scalar_one_or_none()
if not test_user:
test_user = User(
username="test_buyer",
phone="13800138000",
email="test@example.com",
address="北京市海淀区",
balance=20000.0
)
db.add(test_user)
db.commit()
# 4. 创建订单(购买2个iPhone 15 + 1个MacBook Pro)
try:
order = create_order(
db,
user_id=test_user.id,
product_items=[(iphone.id, 2), (macbook.id, 1)]
)
print(f"创建订单成功:{order}") # 输出: <Order(id=1, order_no=202603221724427528, status=pending, total_amount=24997.0)>
print(f"订单总金额:{order.total_amount}") # 输出: 24997.0
print(f"订单项数量:{len(order.items)}") # 输出: 2
# 打印订单项详情
for item in order.items:
print(f" - 商品:{item.product.name},数量:{item.quantity},单价:{item.unit_price}")
# 输出:
# - 商品:iPhone 15,数量:2,单价:5999.0
# - 商品:MacBook Pro,数量:1,单价:12999.0
except ValueError as e:
print(f"创建订单失败:{e}") # 输出:
# 5. 分页查询商品(测试多条件过滤)
print("\n=== 分页查询商品(价格区间 5000-15000,按价格升序)===")
product_pagination = get_product_list(
db,
page=1,
page_size=5,
min_price=5000.0,
max_price=15000.0,
sort_by="price",
sort_order="asc"
)
print(f"总商品数:{product_pagination.total}") # 输出: 2
print(f"当前页:{product_pagination.page}/{product_pagination.total_pages}") # 输出: 1/1
print(f"是否有下一页:{product_pagination.has_next}") # 输出: False
for product in product_pagination.items:
print(f" - {product.name},价格:{product.price},分类:{product.category.name}")
# 输出:
# - iPhone 15,价格:5999.0,分类:电子产品
# - MacBook Pro,价格:12999.0,分类:电子产品
# 6. 分页查询订单
print("\n=== 分页查询订单 ===")
order_pagination = get_order_list(
db,
page=1,
page_size=10,
user_id=test_user.id
)
print(f"总订单数:{order_pagination.total}") # 输出: 1
for order in order_pagination.items:
print(f" - 订单编号:{order.order_no},状态:{order.status},金额:{order.total_amount}")
# 输出:
# - 订单编号:202603221724427528,状态:pending,金额:24997.0
# 7. 更新订单状态为已支付
if order_pagination.items:
updated_order = update_order_status(
db,
order_id=order_pagination.items[0].id,
status="paid",
payment_time=datetime.datetime.now(datetime.UTC)
)
print(f"\n更新订单状态成功:{updated_order.order_no} -> {updated_order.status}") # 输出: 202603221724427528 -> paid
except SQLAlchemyError as e:
db.rollback()
print(f"数据库错误:{e}")
except ValueError as e:
print(f"业务错误:{e}")
finally:
db.close()
print("\n数据库会话已关闭")
if __name__ == "__main__":
# 创建表
create_tables()
# 运行测试
main()
2.3.核心知识点解析(深入理解 SQLAlchemy)
2.3.1.复杂关系映射
- 一对多:
Category -> Product、User -> Order,通过ForeignKey+relationship实现 - 多对多(隐式):
Order <-> Product通过中间表OrderItem实现(更灵活的多对多,可存储额外字段如购买数量、单价) - 级联操作:
cascade="all, delete-orphan"实现删除主表数据时自动删除关联子表数据
2.3.2.高级查询技巧
- 条件构建:使用
and_()、or_()组合多条件,支持动态条件拼接 - 关联加载优化:
joinedload():左连接加载关联数据(适合一对一 / 一对多)selectinload():IN 查询加载关联数据(适合多对多 / 一对多)load_only():只加载需要的字段,减少数据传输
- 行锁:
with_for_update()实现悲观锁,防止超卖(电商核心场景)
2.3.3.分页功能进阶
- 通用分页结构:封装
Pagination类,包含总数、页码、是否有下一页 / 上一页等信息 - 条件分页:分页查询支持多条件过滤(价格区间、分类、关键词)+ 自定义排序
- 参数校验:限制页大小范围,防止恶意请求(如 page_size=10000)
2.3.4.事务管理
- 完整事务:创建订单时包含「库存检查 → 扣减库存 → 创建订单 → 提交事务」完整流程
- 异常回滚:任何步骤出错都通过
db.rollback()回滚事务,保证数据一致性 - 并发控制:使用行锁防止超卖,解决电商核心并发问题
2.3.5.数据操作最佳实践
- 批量操作:
add_all()批量添加数据,减少数据库交互 - 更新优化:直接更新字段而非查询后修改(如库存扣减)
- 软删除:
is_deleted字段实现逻辑删除,保留数据记录
2.4.总结
- 关系建模:电商场景的多对多关系(订单 - 商品)通过中间表实现,比博客系统的简单关系更复杂,能充分理解 SQLAlchemy 的关系映射;
- 查询优化:关联加载、字段过滤、行锁等技巧,解决实际业务中的性能和并发问题;
- 分页进阶:支持多条件过滤、自定义排序的分页功能,覆盖实际项目中分页的常见需求;
- 事务管理:完整的事务流程 + 异常回滚,体现 SQLAlchemy 在数据一致性方面的能力;
- 业务结合:将 SQLAlchemy 操作与电商核心业务(订单创建、库存管理、订单状态更新)结合,理解 ORM 在实际项目中的应用方式。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)