Flask数据库集成、ORM与迁移管理深度解析

一、概述

数据库是Web应用的核心组件之一,Flask通过扩展提供了强大的数据库集成能力。本文将深入解析Flask-SQLAlchemy的配置原理、模型定义、关系映射、数据库操作API以及Flask-Migrate迁移管理机制,帮助开发者构建健壮的数据持久化层。

二、Flask-SQLAlchemy配置系统

2.1 核心配置参数

Flask-SQLAlchemy通过配置字典SQLALCHEMY_*控制数据库连接和行为:

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)

# 核心配置参数详解
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'  # 数据库连接URI
app.config['SQLALCHEMY_BINDS'] = {  # 多数据库绑定
    'users': 'sqlite:///users.db',
    'meta': 'postgresql://user:pass@localhost/meta'
}
app.config['SQLALCHEMY_ECHO'] = False  # 是否打印SQL语句
app.config['SQLALCHEMY_RECORD_QUERIES'] = None  # 是否记录查询(调试用)
app.config['SQLALCHEMY_NATIVE_UNICODE'] = None  # 是否使用原生Unicode
app.config['SQLALCHEMY_POOL_SIZE'] = 5  # 连接池大小
app.config['SQLALCHEMY_POOL_TIMEOUT'] = 30  # 连接池超时(秒)
app.config['SQLALCHEMY_POOL_RECYCLE'] = -1  # 连接回收时间(秒)
app.config['SQLALCHEMY_MAX_OVERFLOW'] = 10  # 连接池最大溢出
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False  # 是否追踪对象修改
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {  # SQLAlchemy引擎选项
    'pool_pre_ping': True,  # 连接前检查可用性
    'connect_args': {'timeout': 15}  # 连接参数
}

db = SQLAlchemy(app)

2.2 数据库URI格式

# 各种数据库URI格式示例

# SQLite(相对路径)
SQLITE_RELATIVE = 'sqlite:///app.db'  # 相对于实例文件夹

# SQLite(绝对路径)
SQLITE_ABSOLUTE = 'sqlite:////absolute/path/to/app.db'

# SQLite(内存数据库)
SQLITE_MEMORY = 'sqlite://:memory:'

# PostgreSQL
POSTGRESQL = 'postgresql://user:password@localhost:5432/mydb'

# PostgreSQL(带Unix Socket)
POSTGRESQL_SOCKET = 'postgresql:///mydb?host=/var/run/postgresql'

# MySQL
MYSQL = 'mysql://user:password@localhost:3306/mydb'

# MySQL(带驱动指定)
MYSQL_DRIVER = 'mysql+pymysql://user:password@localhost/mydb?charset=utf8mb4'

# Oracle
ORACLE = 'oracle://user:password@localhost:1521/sid'

# Oracle(带驱动)
ORACLE_DRIVER = 'oracle+cx_oracle://user:password@localhost:1521/sid'

# Microsoft SQL Server
MSSQL = 'mssql+pyodbc://user:password@dsn_name'
MSSQL_NEW = 'mssql+pymssql://user:password@hostname:port/dbname'

2.3 多数据库绑定配置

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)

# 配置主数据库和绑定数据库
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://main:pass@localhost/main'
app.config['SQLALCHEMY_BINDS'] = {
    'users': 'postgresql://users:pass@localhost/users',
    'logs': 'sqlite:///logs.db',
    'archive': 'mysql://archive:pass@localhost/archive'
}

db = SQLAlchemy(app)

# 绑定键命名规范
# - 使用字符串键名标识不同数据库
# - 每个绑定可以有独立的URI和配置
# - 模型通过__bind_key__指定所属数据库

2.4 配置系统架构图

Flask-SQLAlchemy配置系统

Flask App Config

SQLALCHEMY_DATABASE_URI

SQLALCHEMY_BINDS

SQLALCHEMY_ENGINE_OPTIONS

其他配置参数

主数据库引擎

绑定数据库引擎1

绑定数据库引擎2

绑定数据库引擎N

连接池

连接池

连接池

连接池

主数据库会话

绑定数据库会话

引擎选项

行为配置

ECHO/TRACK_MODIFICATIONS等

三、模型定义与字段类型

3.1 模型基类与声明式映射

from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import Column, Integer, String, Text, Boolean, DateTime, Date, Time
from sqlalchemy import Float, Numeric, LargeBinary, JSON, Enum, PickleType
from sqlalchemy import ForeignKey, Table, UniqueConstraint, CheckConstraint
from sqlalchemy.orm import relationship, backref, deferred, validates, hybrid_property
from datetime import datetime
import enum

db = SQLAlchemy()

class BaseModel(db.Model):
    """抽象基类模型"""
    __abstract__ = True  # 声明为抽象基类,不创建表
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    def save(self):
        """保存模型实例"""
        db.session.add(self)
        db.session.commit()
        return self
    
    def delete(self):
        """删除模型实例"""
        db.session.delete(self)
        db.session.commit()
    
    @classmethod
    def get_by_id(cls, id):
        """根据ID查询"""
        return cls.query.get(id)
    
    @classmethod
    def get_all(cls):
        """查询所有记录"""
        return cls.query.all()
    
    def to_dict(self):
        """转换为字典"""
        return {c.name: getattr(self, c.name) for c in self.__table__.columns}

3.2 字段类型详解

class FieldTypeDemo(db.Model):
    """字段类型演示模型"""
    __tablename__ = 'field_types'
    
    # 整数类型
    integer_field = Column(Integer)  # 32位整数
    smallint_field = Column(db.SmallInteger)  # 16位整数
    bigint_field = Column(db.BigInteger)  # 64位整数
    
    # 字符串类型
    string_field = Column(String(100))  # 变长字符串,最大100字符
    char_field = Column(db.CHAR(10))  # 定长字符串
    text_field = Column(Text)  # 长文本
    nvarchar_field = Column(db.NVARCHAR(100))  # Unicode变长字符串
    
    # 布尔类型
    bool_field = Column(Boolean, default=False)
    
    # 日期时间类型
    datetime_field = Column(DateTime)
    date_field = Column(Date)
    time_field = Column(Time)
    
    # 浮点类型
    float_field = Column(Float)  # 双精度浮点
    real_field = Column(db.REAL)  # 单精度浮点
    
    # 定点数类型
    numeric_field = Column(Numeric(10, 2))  # 精度10,小数位2
    decimal_field = Column(db.DECIMAL(10, 2))
    
    # 二进制类型
    binary_field = Column(LargeBinary)  # 二进制数据
    blob_field = Column(db.BLOB)
    
    # JSON类型(SQLAlchemy 1.3+)
    json_field = Column(JSON)  # JSON数据
    
    # 枚举类型
    status_field = Column(Enum('active', 'inactive', 'pending', name='status_enum'))
    
    # Pickle类型(序列化Python对象)
    pickle_field = Column(PickleType)
    
    # UUID类型
    uuid_field = Column(db.UUID, unique=True)
    
    # Interval类型
    interval_field = Column(db.Interval)
    
    # ARRAY类型(PostgreSQL专用)
    # array_field = Column(db.ARRAY(Integer))

3.3 列参数详解

class ColumnParameters(db.Model):
    """列参数演示模型"""
    __tablename__ = 'column_params'
    
    # primary_key: 主键
    id = Column(Integer, primary_key=True)
    
    # nullable: 是否允许NULL
    name = Column(String(100), nullable=False)
    
    # default: 默认值(Python层面)
    role = Column(String(50), default='user')
    
    # server_default: 服务器端默认值
    created_at = Column(DateTime, server_default=db.func.now())
    
    # index: 创建索引
    email = Column(String(120), index=True)
    
    # unique: 唯一约束
    username = Column(String(80), unique=True)
    
    # autoincrement: 自增
    seq_num = Column(Integer, autoincrement=True)
    
    # comment: 列注释
    description = Column(Text, comment='用户描述信息')
    
    # doc: 文档字符串
    notes = Column(Text, doc='内部备注')
    
    # key: 属性名映射
    internal_name = Column('internal', String(50))
    
    # onupdate: 更新时回调
    updated_at = Column(DateTime, onupdate=db.func.now())
    
    # foreign_keys: 外键指定
    # category_id = Column(Integer, ForeignKey('category.id'), foreign_keys='category_id')
    
    # deferred: 延迟加载
    large_content = Column(Text, deferred=True)
    
    # system: 系统列标记
    system_flag = Column(Boolean, system=True)

3.4 约束定义

from sqlalchemy import PrimaryKeyConstraint, UniqueConstraint, CheckConstraint, Index

class ConstraintDemo(db.Model):
    """约束演示模型"""
    __tablename__ = 'constraints_demo'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(100))
    email = Column(String(120))
    age = Column(Integer)
    department = Column(String(50))
    position = Column(String(50))
    
    # 表级主键约束(复合主键)
    # __table_args__ = (
    #     PrimaryKeyConstraint('id', 'name', name='pk_demo'),
    # )
    
    # 表级唯一约束
    __table_args__ = (
        # 单列唯一约束(也可以在Column中使用unique=True)
        UniqueConstraint('email', name='uq_email'),
        
        # 复合唯一约束
        UniqueConstraint('department', 'position', name='uq_dept_position'),
        
        # 检查约束
        CheckConstraint('age >= 18', name='ck_age'),
        CheckConstraint("email LIKE '%@%'", name='ck_email_format'),
        
        # 索引定义
        Index('idx_name_dept', 'name', 'department'),
        Index('idx_name_lower', db.func.lower(name)),  # 函数索引
        
        # 表参数
        {'mysql_engine': 'InnoDB', 'mysql_charset': 'utf8mb4'}
    )

3.5 模型定义架构图

contains

provides

mapped to

contains

manages

1

1

1

1

1

1

1

1

*

*

SQLAlchemy

+Model: Type

+session: Session

+metadata: MetaData

+engine: Engine

+init_app(app)

+create_all()

+drop_all()

«abstract»

Model

+query: Query

+tablename: str

+table: Table

+mapper: Mapper

+save()

+delete()

+to_dict()

Column

+name: str

+type: TypeEngine

+primary_key: bool

+nullable: bool

+default: Any

+server_default: DefaultClause

+index: bool

+unique: bool

+foreign_key: ForeignKey

Table

+name: str

+columns: ColumnCollection

+constraints: ConstraintCollection

+indexes: IndexCollection

+append_column(column)

MetaData

+tables: TableCollection

+reflect(bind)

+create_all(bind)

+drop_all(bind)

声明式基类\n所有模型继承此类

字段类型与约束\n定义表结构

四、关系定义详解

4.1 一对多关系

class Author(db.Model):
    """作者模型(一对多关系的一端)"""
    __tablename__ = 'authors'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)
    email = Column(String(120), unique=True)
    
    # relationship参数详解
    # 第一个参数: 关联的模型类名(字符串形式支持延迟解析)
    # backref: 在关联模型上创建的反向引用属性名
    # lazy: 加载策略 ('select', 'joined', 'subquery', 'dynamic', 'noload')
    # cascade: 级联操作 ('save-update', 'delete', 'delete-orphan', 'all', 'refresh-expire')
    # order_by: 排序条件
    # foreign_keys: 外键指定(多外键时使用)
    # primaryjoin: 主连接条件
    # uselist: 是否使用列表(False表示标量关系)
    # viewonly: 只读关系
    # passive_deletes: 被动删除策略
    # single_parent: 单父约束
    books = relationship(
        'Book',
        backref=backref('author', lazy='joined'),  # 反向引用,joined加载
        lazy='dynamic',  # 动态加载,返回查询对象
        cascade='all, delete-orphan',  # 级联删除
        order_by='Book.published_date.desc()',  # 按出版日期降序
        passive_deletes=True  # 让数据库处理删除
    )
    
    def __repr__(self):
        return f'<Author {self.name}>'


class Book(db.Model):
    """图书模型(一对多关系的多端)"""
    __tablename__ = 'books'
    
    id = Column(Integer, primary_key=True)
    title = Column(String(200), nullable=False)
    isbn = Column(String(20), unique=True)
    published_date = Column(Date)
    price = Column(Numeric(10, 2))
    
    # ForeignKey参数详解
    # 第一个参数: 关联表的列名(格式:'表名.列名')
    # ondelete: 数据库级删除行为 ('CASCADE', 'SET NULL', 'RESTRICT')
    # onupdate: 数据库级更新行为
    # deferrable: 是否延迟约束检查
    # initially: 初始约束模式
    author_id = Column(
        Integer,
        ForeignKey('authors.id', ondelete='CASCADE'),
        nullable=False
    )
    
    def __repr__(self):
        return f'<Book {self.title}>'

4.2 多对多关系

# 关联表(中间表)
student_course = Table(
    'student_course',
    db.Model.metadata,
    Column('student_id', Integer, ForeignKey('students.id', ondelete='CASCADE'), primary_key=True),
    Column('course_id', Integer, ForeignKey('courses.id', ondelete='CASCADE'), primary_key=True),
    Column('enrollment_date', DateTime, default=db.func.now()),  # 额外字段
    Column('grade', String(2)),  # 成绩
    Column('is_active', Boolean, default=True),  # 是否有效
    PrimaryKeyConstraint('student_id', 'course_id', name='pk_student_course')
)


class Student(db.Model):
    """学生模型"""
    __tablename__ = 'students'
    
    id = Column(Integer, primary_key=True)
    student_no = Column(String(20), unique=True, nullable=False)
    name = Column(String(100), nullable=False)
    
    # 多对多关系
    # secondary: 关联表
    # secondaryjoin: 次连接条件(复杂关联表时使用)
    # primaryjoin: 主连接条件(复杂关联表时使用)
    courses = relationship(
        'Course',
        secondary=student_course,
        backref=backref('students', lazy='dynamic'),
        lazy='select',
        cascade='all, delete',
        passive_deletes=True
    )
    
    def enroll(self, course, grade=None):
        """选课"""
        if course not in self.courses:
            self.courses.append(course)
            # 更新关联表额外字段
            db.session.execute(
                student_course.update()
                .where(student_course.c.student_id == self.id)
                .where(student_course.c.course_id == course.id)
                .values(grade=grade)
            )
    
    def drop(self, course):
        """退课"""
        if course in self.courses:
            self.courses.remove(course)


class Course(db.Model):
    """课程模型"""
    __tablename__ = 'courses'
    
    id = Column(Integer, primary_key=True)
    code = Column(String(20), unique=True, nullable=False)
    name = Column(String(200), nullable=False)
    credits = Column(Integer, default=3)
    
    def __repr__(self):
        return f'<Course {self.code}: {self.name}>'

4.3 一对一关系

class User(db.Model):
    """用户模型"""
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    username = Column(String(80), unique=True, nullable=False)
    email = Column(String(120), unique=True, nullable=False)
    
    # 一对一关系
    # uselist=False: 标量关系,返回单个对象而非列表
    # cascade='all, delete-orphan': 删除用户时同时删除档案
    profile = relationship(
        'UserProfile',
        backref=backref('user', uselist=False),  # 反向也是一对一
        uselist=False,
        cascade='all, delete-orphan',
        single_parent=True  # 确保档案只能属于一个用户
    )


class UserProfile(db.Model):
    """用户档案模型"""
    __tablename__ = 'user_profiles'
    
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'), unique=True, nullable=False)  # unique确保一对一
    avatar = Column(String(200))
    bio = Column(Text)
    location = Column(String(100))
    website = Column(String(200))
    birthday = Column(Date)
    
    def __repr__(self):
        return f'<UserProfile of {self.user.username}>'

4.4 自引用关系

class Category(db.Model):
    """分类模型(自引用关系)"""
    __tablename__ = 'categories'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)
    parent_id = Column(Integer, ForeignKey('categories.id'))
    
    # 自引用一对多关系(父分类 -> 子分类)
    # remote_side: 指定远程端列,用于区分关系方向
    children = relationship(
        'Category',
        backref=backref('parent', remote_side=[id]),
        lazy='select',
        cascade='all, delete-orphan',
        single_parent=True
    )
    
    def get_ancestors(self):
        """获取所有祖先分类"""
        ancestors = []
        current = self.parent
        while current:
            ancestors.append(current)
            current = current.parent
        return ancestors
    
    def get_descendants(self):
        """获取所有后代分类"""
        descendants = []
        def collect_children(category):
            for child in category.children:
                descendants.append(child)
                collect_children(child)
        collect_children(self)
        return descendants


class Employee(db.Model):
    """员工模型(自引用多对多)"""
    __tablename__ = 'employees'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)
    
    # 自引用多对多关系(下属关系)
    subordinate = Table(
        'employee_subordinate',
        db.Model.metadata,
        Column('manager_id', Integer, ForeignKey('employees.id'), primary_key=True),
        Column('subordinate_id', Integer, ForeignKey('employees.id'), primary_key=True),
        CheckConstraint('manager_id != subordinate_id', name='ck_no_self_reference')
    )
    
    # 下属员工
    subordinates = relationship(
        'Employee',
        secondary=subordinate,
        primaryjoin=id == subordinate.c.manager_id,
        secondaryjoin=id == subordinate.c.subordinate_id,
        backref=backref('managers', lazy='dynamic'),
        lazy='dynamic'
    )

4.5 关系加载策略

class Order(db.Model):
    """订单模型"""
    __tablename__ = 'orders'
    
    id = Column(Integer, primary_key=True)
    order_no = Column(String(50), unique=True)
    customer_id = Column(Integer, ForeignKey('customers.id'))
    status = Column(String(20))
    
    # 加载策略详解
    
    # 1. lazy='select' (默认): 延迟加载,首次访问时发出SELECT
    items_select = relationship('OrderItem', lazy='select')
    
    # 2. lazy='joined': 预加载,使用JOIN一次性加载
    items_joined = relationship('OrderItem', lazy='joined')
    
    # 3. lazy='subquery': 子查询预加载,发出额外SELECT
    items_subquery = relationship('OrderItem', lazy='subquery')
    
    # 4. lazy='dynamic': 动态加载,返回查询对象,可进一步筛选
    items_dynamic = relationship('OrderItem', lazy='dynamic')
    
    # 5. lazy='noload': 不加载,返回空列表
    items_noload = relationship('OrderItem', lazy='noload')
    
    # 6. lazy='raise': 访问时抛出异常(防止N+1问题)
    items_raise = relationship('OrderItem', lazy='raise')


# 运行时加载控制
from sqlalchemy.orm import joinedload, subqueryload, lazyload, contains_eager

# 使用joinedload预加载
orders = Order.query.options(joinedload(Order.items_joined)).all()

# 使用subqueryload预加载
orders = Order.query.options(subqueryload(Order.items_subquery)).all()

# 多层预加载
orders = Order.query.options(
    joinedload(Order.items_joined).joinedload(OrderItem.product)
).all()

# 条件加载
orders = Order.query.filter(Order.status == 'pending').all()

4.6 关系定义架构图

writes

enrolls

has

parent-child

manages

places

contains

in

AUTHOR

int

id

PK

string

name

string

email

BOOK

int

id

PK

int

author_id

FK

string

title

string

isbn

date

published_date

STUDENT

int

id

PK

string

student_no

string

name

COURSE

int

id

PK

string

code

string

name

int

credits

USER

int

id

PK

string

username

string

email

USER_PROFILE

int

id

PK

int

user_id

FK,UK

string

avatar

text

bio

CATEGORY

int

id

PK

string

name

int

parent_id

FK

EMPLOYEE

int

id

PK

string

name

CUSTOMER

int

id

PK

string

name

ORDER

int

id

PK

int

customer_id

FK

string

order_no

string

status

ORDER_ITEM

int

id

PK

int

order_id

FK

int

product_id

FK

int

quantity

PRODUCT

int

id

PK

string

name

decimal

price

五、数据库操作API

5.1 会话管理

from flask import Flask
from contextlib import contextmanager

app = Flask(__name__)
db = SQLAlchemy(app)

# 会话基本操作
class SessionOperations:
    """会话操作示例"""
    
    @staticmethod
    def add_example():
        """添加记录"""
        user = User(username='john', email='john@example.com')
        db.session.add(user)  # 添加到会话
        db.session.commit()  # 提交事务
        return user
    
    @staticmethod
    def add_multiple():
        """批量添加"""
        users = [
            User(username='user1', email='user1@example.com'),
            User(username='user2', email='user2@example.com'),
            User(username='user3', email='user3@example.com')
        ]
        db.session.add_all(users)  # 批量添加
        db.session.commit()
        return users
    
    @staticmethod
    def update_example():
        """更新记录"""
        user = User.query.filter_by(username='john').first()
        if user:
            user.email = 'john.doe@example.com'
            db.session.commit()
        return user
    
    @staticmethod
    def delete_example():
        """删除记录"""
        user = User.query.filter_by(username='john').first()
        if user:
            db.session.delete(user)
            db.session.commit()
        return True
    
    @staticmethod
    def merge_example():
        """合并对象(将分离状态的对象合并到会话)"""
        user = User(id=1, username='john_updated', email='new@example.com')
        merged = db.session.merge(user)  # 合并到会话
        db.session.commit()
        return merged
    
    @staticmethod
    def expunge_example():
        """从会话中移除对象"""
        user = User(username='temp', email='temp@example.com')
        db.session.add(user)
        db.session.expunge(user)  # 移除但不删除
        # user对象现在处于分离状态
    
    @staticmethod
    def flush_example():
        """刷新会话(发送SQL但不提交)"""
        user = User(username='flush_test', email='flush@example.com')
        db.session.add(user)
        db.session.flush()  # 发送INSERT,获取自增ID
        print(f"User ID: {user.id}")  # 此时ID已生成
        db.session.commit()


# 事务管理
class TransactionManager:
    """事务管理示例"""
    
    @staticmethod
    @contextmanager
    def transaction():
        """事务上下文管理器"""
        try:
            yield db.session
            db.session.commit()
        except Exception as e:
            db.session.rollback()
            raise e
    
    @staticmethod
    def nested_transaction():
        """嵌套事务(SAVEPOINT)"""
        user1 = User(username='user1', email='user1@example.com')
        db.session.add(user1)
        
        # 开始嵌套事务
        savepoint = db.session.begin_nested()
        try:
            user2 = User(username='user2', email='user2@example.com')
            db.session.add(user2)
            # 模拟错误
            if some_condition:
                raise ValueError("Something went wrong")
            savepoint.commit()
        except Exception:
            savepoint.rollback()
            # user1仍然在会话中,user2被回滚
        
        db.session.commit()
    
    @staticmethod
    def bulk_operations():
        """批量操作(性能优化)"""
        # 批量插入(不触发ORM事件)
        users = [
            {'username': f'user{i}', 'email': f'user{i}@example.com'}
            for i in range(1000)
        ]
        db.session.bulk_insert_mappings(User, users)
        db.session.commit()
        
        # 批量更新
        updates = [
            {'id': i, 'username': f'updated_user{i}'}
            for i in range(1, 100)
        ]
        db.session.bulk_update_mappings(User, updates)
        db.session.commit()

5.2 查询API

from sqlalchemy import or_, and_, not_, func, desc, asc, case, literal_column
from sqlalchemy.sql.expression import select, exists

class QueryExamples:
    """查询示例"""
    
    @staticmethod
    def basic_queries():
        """基本查询"""
        # 查询所有
        all_users = User.query.all()
        
        # 根据主键查询
        user = User.query.get(1)  # 返回None如果不存在
        user = db.session.get(User, 1)  # SQLAlchemy 2.0推荐方式
        
        # 根据主键查询(不存在则抛出404)
        user = User.query.get_or_404(1)
        
        # 根据条件查询第一条
        user = User.query.filter_by(username='john').first()
        user = User.query.filter_by(username='john').first_or_404()
        
        # 根据条件查询所有
        users = User.query.filter_by(role='admin').all()
        
        # 查询数量
        count = User.query.filter_by(active=True).count()
        
        # 检查是否存在
        exists = db.session.query(User.query.filter_by(username='john').exists()).scalar()
        
        # 获取第一条结果的某个字段
        email = User.query.filter_by(username='john').with_entities(User.email).scalar()
        
        # 获取所有结果的某个字段
        emails = User.query.with_entities(User.email).all()
        
        return all_users
    
    @staticmethod
    def filter_operations():
        """过滤操作"""
        # 比较操作
        users = User.query.filter(User.age > 18).all()
        users = User.query.filter(User.age >= 18).all()
        users = User.query.filter(User.age < 60).all()
        users = User.query.filter(User.age <= 60).all()
        users = User.query.filter(User.age == 25).all()
        users = User.query.filter(User.age != 25).all()
        
        # 模糊匹配
        users = User.query.filter(User.name.like('%john%')).all()
        users = User.query.filter(User.name.ilike('%JOHN%')).all()  # 不区分大小写
        users = User.query.filter(User.name.notlike('%spam%')).all()
        
        # IN操作
        users = User.query.filter(User.id.in_([1, 2, 3])).all()
        users = User.query.filter(User.id.notin_([1, 2, 3])).all()
        
        # NULL检查
        users = User.query.filter(User.email.is_(None)).all()
        users = User.query.filter(User.email.isnot(None)).all()
        
        # 范围查询
        users = User.query.filter(User.age.between(18, 30)).all()
        
        # 字符串操作
        users = User.query.filter(User.name.startswith('J')).all()
        users = User.query.filter(User.name.endswith('n')).all()
        users = User.query.filter(User.name.contains('oh')).all()
        
        # 正则表达式(PostgreSQL)
        # users = User.query.filter(User.name.op('~')('^J')).all()
        
        return users
    
    @staticmethod
    def logical_operations():
        """逻辑操作"""
        # AND操作
        users = User.query.filter(
            and_(User.age > 18, User.active == True)
        ).all()
        
        # AND操作(链式调用)
        users = User.query.filter(User.age > 18).filter(User.active == True).all()
        
        # OR操作
        users = User.query.filter(
            or_(User.role == 'admin', User.role == 'moderator')
        ).all()
        
        # NOT操作
        users = User.query.filter(
            not_(User.active == False)
        ).all()
        
        # 复杂组合
        users = User.query.filter(
            or_(
                and_(User.age > 18, User.role == 'admin'),
                and_(User.age > 21, User.role == 'moderator')
            )
        ).all()
        
        return users
    
    @staticmethod
    def ordering_and_limiting():
        """排序和限制"""
        # 排序
        users = User.query.order_by(User.created_at).all()  # 升序
        users = User.query.order_by(desc(User.created_at)).all()  # 降序
        users = User.query.order_by(User.last_name, User.first_name).all()  # 多字段排序
        users = User.query.order_by(User.last_name.asc(), User.first_name.desc()).all()
        
        # 限制数量
        users = User.query.limit(10).all()
        users = User.query.offset(20).limit(10).all()  # 分页
        users = User.query[20:30]  # 切片语法
        
        # 获取第一条
        user = User.query.first()
        user = User.query.order_by(User.created_at.desc()).first()
        
        # 获取一条或None
        user = User.query.filter_by(username='john').one_or_none()
        
        # 获取一条(结果不唯一时抛出异常)
        user = User.query.filter_by(id=1).one()
        
        return users
    
    @staticmethod
    def aggregation_functions():
        """聚合函数"""
        # COUNT
        count = db.session.query(func.count(User.id)).scalar()
        count = db.session.query(func.count('*')).select_from(User).scalar()
        count = User.query.count()
        
        # SUM
        total = db.session.query(func.sum(Order.amount)).scalar()
        
        # AVG
        avg_age = db.session.query(func.avg(User.age)).scalar()
        
        # MIN/MAX
        min_age = db.session.query(func.min(User.age)).scalar()
        max_age = db.session.query(func.max(User.age)).scalar()
        
        # GROUP BY
        from sqlalchemy import distinct
        result = db.session.query(
            User.role,
            func.count(User.id).label('count')
        ).group_by(User.role).all()
        
        # HAVING
        result = db.session.query(
            User.role,
            func.count(User.id).label('count')
        ).group_by(User.role).having(func.count(User.id) > 5).all()
        
        # DISTINCT
        roles = db.session.query(User.role).distinct().all()
        count = db.session.query(func.count(distinct(User.role))).scalar()
        
        return result
    
    @staticmethod
    def join_queries():
        """连接查询"""
        # 内连接
        result = db.session.query(User, Order).join(Order).all()
        
        # 指定连接条件
        result = db.session.query(User).join(
            Order, User.id == Order.user_id
        ).all()
        
        # 左外连接
        result = db.session.query(User).outerjoin(Order).all()
        
        # 多表连接
        result = db.session.query(User, Order, OrderItem)\
            .join(Order)\
            .join(OrderItem)\
            .all()
        
        # 使用relationship的join
        result = db.session.query(User).join(User.orders).all()
        
        # 自连接
        result = db.session.query(Employee, Employee)\
            .join(Employee, Employee.manager_id == Employee.id)\
            .all()
        
        # 子查询
        subq = db.session.query(Order.user_id, func.count('*').label('order_count'))\
            .group_by(Order.user_id).subquery()
        
        result = db.session.query(User, subq.c.order_count)\
            .outerjoin(subq, User.id == subq.c.user_id)\
            .all()
        
        return result
    
    @staticmethod
    def advanced_queries():
        """高级查询"""
        # CASE表达式
        result = db.session.query(
            User.name,
            case(
                (User.age < 18, 'minor'),
                (User.age < 65, 'adult'),
                else_='senior'
            ).label('age_group')
        ).all()
        
        # UNION
        q1 = db.session.query(User.name).filter(User.role == 'admin')
        q2 = db.session.query(User.name).filter(User.role == 'moderator')
        result = q1.union(q2).all()
        
        # UNION ALL
        result = q1.union_all(q2).all()
        
        # EXISTS子查询
        result = User.query.filter(
            exists().where(Order.user_id == User.id)
        ).all()
        
        # NOT EXISTS
        result = User.query.filter(
            ~exists().where(Order.user_id == User.id)
        ).all()
        
        # 原生SQL
        result = db.session.execute(
            "SELECT * FROM users WHERE age > :age",
            {'age': 18}
        ).fetchall()
        
        # 使用text
        from sqlalchemy import text
        result = db.session.query(User).from_statement(
            text("SELECT * FROM users WHERE username = :username")
        ).params(username='john').all()
        
        return result

5.3 分页查询

from flask import request, jsonify

class PaginationExample:
    """分页示例"""
    
    @staticmethod
    def basic_pagination():
        """基本分页"""
        # paginate参数: page=页码, per_page=每页数量, error_out=超出范围是否报错
        page = request.args.get('page', 1, type=int)
        per_page = request.args.get('per_page', 20, type=int)
        
        pagination = User.query.paginate(
            page=page,
            per_page=per_page,
            error_out=False  # 超出范围返回空列表而非404
        )
        
        # 分页对象属性
        result = {
            'items': [user.to_dict() for user in pagination.items],  # 当前页数据
            'total': pagination.total,  # 总记录数
            'pages': pagination.pages,  # 总页数
            'page': pagination.page,  # 当前页码
            'per_page': pagination.per_page,  # 每页数量
            'has_next': pagination.has_next,  # 是否有下一页
            'has_prev': pagination.has_prev,  # 是否有上一页
            'next_num': pagination.next_num,  # 下一页页码
            'prev_num': pagination.prev_num,  # 上一页页码
        }
        
        return result
    
    @staticmethod
    def iter_pages_example():
        """页码迭代"""
        pagination = User.query.paginate(page=1, per_page=10)
        
        # iter_pages参数: left_edge=左边缘页数, left_current=当前页左边页数
        # right_current=当前页右边页数, right_edge=右边缘页数
        page_numbers = list(pagination.iter_pages(
            left_edge=2,
            left_current=2,
            right_current=2,
            right_edge=2
        ))
        # 示例输出: [1, 2, None, 4, 5, 6, 7, 8, None, 99, 100]
        # None表示省略号
        
        return page_numbers
    
    @staticmethod
    def optimized_pagination():
        """优化分页(大数据量)"""
        # 使用seek method避免OFFSET性能问题
        last_id = request.args.get('last_id', 0, type=int)
        per_page = 20
        
        # 传统方式(大数据量时性能差)
        # users = User.query.order_by(User.id).offset((page-1)*per_page).limit(per_page).all()
        
        # Seek方式(推荐)
        users = User.query.filter(User.id > last_id)\
            .order_by(User.id)\
            .limit(per_page)\
            .all()
        
        return {
            'items': [user.to_dict() for user in users],
            'last_id': users[-1].id if users else None,
            'has_more': len(users) == per_page
        }


# Flask路由中的分页示例
@app.route('/api/users')
def get_users():
    """用户列表API(带分页)"""
    page = request.args.get('page', 1, type=int)
    per_page = request.args.get('per_page', 20, type=int)
    
    # 验证参数
    if per_page > 100:
        per_page = 100
    
    # 查询
    query = User.query.filter_by(active=True).order_by(User.created_at.desc())
    pagination = query.paginate(page=page, per_page=per_page)
    
    return jsonify({
        'data': [user.to_dict() for user in pagination.items],
        'pagination': {
            'total': pagination.total,
            'pages': pagination.pages,
            'page': pagination.page,
            'per_page': pagination.per_page,
            'has_next': pagination.has_next,
            'has_prev': pagination.has_prev,
        }
    })

5.4 数据库操作流程图

数据库 数据库引擎 数据库会话 视图函数 客户端 数据库 数据库引擎 数据库会话 视图函数 客户端 创建/查询对象 异常处理 alt [发生异常] HTTP请求 session.add(obj) 添加到会话(未持久化) session.commit() 开始事务 执行SQL 返回结果 提交成功 返回 HTTP响应 session.rollback() 抛出异常 错误响应

六、Flask-Migrate数据库迁移

6.1 迁移配置

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate, MigrateCommand
from flask_script import Manager

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db = SQLAlchemy(app)
migrate = Migrate(app, db)

# Flask-Script集成(旧版Flask)
manager = Manager(app)
manager.add_command('db', MigrateCommand)

# 迁移目录结构
# migrations/
# ├── alembic.ini        # Alembic配置
# ├── env.py             # 运行环境配置
# ├── script.py.mako     # 迁移脚本模板
# └── versions/          # 迁移版本文件
#     ├── 123456789abc_initial.py
#     └── 234567890def_add_user_table.py

6.2 迁移命令详解

# 初始化迁移环境
flask db init
# 创建migrations目录,包含Alembic配置

# 生成迁移脚本
flask db migrate -m "initial migration"
# 自动检测模型变化,生成迁移脚本

# 应用迁移
flask db upgrade
# 将数据库升级到最新版本

# 回退迁移
flask db downgrade
# 回退到上一个版本
flask db downgrade <revision>
# 回退到指定版本

# 查看迁移历史
flask db history
# 显示所有迁移版本

# 查看当前版本
flask db current
# 显示当前数据库版本

# 编辑迁移脚本
flask db edit <revision>
# 使用编辑器打开迁移脚本

# 标记版本(不执行迁移)
flask db stamp
# 将当前数据库标记为最新版本
flask db stamp <revision>
# 标记为指定版本

# 合并迁移
flask db merge <revision1> <revision2> -m "merge message"
# 合并多个分支

# 生成空迁移脚本
flask db revision -m "empty migration"
# 创建空的迁移脚本,手动编写SQL

6.3 迁移脚本结构

# migrations/versions/123456789abc_add_user_table.py

"""add user table

Revision ID: 123456789abc
Revises: 
Create Date: 2024-01-15 10:30:00.000000

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = '123456789abc'  # 当前版本ID
down_revision = None  # 父版本ID(None表示初始迁移)
branch_labels = None  # 分支标签
depends_on = None  # 依赖的其他迁移


def upgrade():
    """升级操作"""
    # 创建表
    op.create_table(
        'users',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('username', sa.String(length=80), nullable=False),
        sa.Column('email', sa.String(length=120), nullable=False),
        sa.Column('password_hash', sa.String(length=128), nullable=True),
        sa.Column('active', sa.Boolean(), nullable=True, server_default='1'),
        sa.Column('created_at', sa.DateTime(), nullable=True, server_default=sa.func.now()),
        sa.Column('updated_at', sa.DateTime(), nullable=True, onupdate=sa.func.now()),
        sa.PrimaryKeyConstraint('id'),
        sa.UniqueConstraint('username'),
        sa.UniqueConstraint('email')
    )
    
    # 创建索引
    op.create_index('idx_users_username', 'users', ['username'], unique=False)
    op.create_index('idx_users_email', 'users', ['email'], unique=False)
    
    # 添加外键
    # op.create_foreign_key('fk_posts_user', 'posts', 'users', ['user_id'], ['id'])


def downgrade():
    """降级操作"""
    # 删除外键
    # op.drop_constraint('fk_posts_user', 'posts', type_='foreignkey')
    
    # 删除索引
    op.drop_index('idx_users_email', table_name='users')
    op.drop_index('idx_users_username', table_name='users')
    
    # 删除表
    op.drop_table('users')

6.4 常用迁移操作

# migrations/versions/234567890def_various_operations.py

"""various operations

Revision ID: 234567890def
Revises: 123456789abc
Create Date: 2024-01-16 14:20:00.000000

"""
from alembic import op
import sqlalchemy as sa


def upgrade():
    # 添加列
    op.add_column('users', sa.Column('phone', sa.String(20), nullable=True))
    op.add_column('users', sa.Column('bio', sa.Text, nullable=True))
    
    # 添加带默认值的列
    op.add_column(
        'users',
        sa.Column('role', sa.String(20), nullable=False, server_default='user')
    )
    
    # 修改列
    op.alter_column(
        'users',
        'username',
        existing_type=sa.String(80),
        type_=sa.String(100),
        existing_nullable=False,
        nullable=False
    )
    
    # 重命名列
    op.alter_column('users', 'phone', new_column_name='phone_number')
    
    # 删除列
    op.drop_column('users', 'old_column')
    
    # 创建表
    op.create_table(
        'posts',
        sa.Column('id', sa.Integer, primary_key=True),
        sa.Column('title', sa.String(200), nullable=False),
        sa.Column('content', sa.Text, nullable=True),
        sa.Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
        sa.Column('created_at', sa.DateTime, server_default=sa.func.now())
    )
    
    # 创建唯一约束
    op.create_unique_constraint('uq_posts_title', 'posts', ['title'])
    
    # 创建检查约束
    op.create_check_constraint(
        'ck_posts_title_length',
        'posts',
        'length(title) > 0'
    )
    
    # 执行原生SQL
    op.execute("UPDATE users SET role = 'user' WHERE role IS NULL")
    
    # 批量操作(对不支持某些操作的数据库如SQLite)
    with op.batch_alter_table('comments') as batch_op:
        batch_op.add_column(sa.Column('approved', sa.Boolean, default=False))
        batch_op.alter_column('content', nullable=False)
        batch_op.create_unique_constraint('uq_comments_hash', ['hash'])


def downgrade():
    # 回退批量操作
    with op.batch_alter_table('comments') as batch_op:
        batch_op.drop_constraint('uq_comments_hash', type_='unique')
        batch_op.alter_column('content', nullable=True)
        batch_op.drop_column('approved')
    
    # 删除检查约束
    op.drop_constraint('ck_posts_title_length', 'posts', type_='check')
    
    # 删除唯一约束
    op.drop_constraint('uq_posts_title', 'posts', type_='unique')
    
    # 删除表
    op.drop_table('posts')
    
    # 恢复列名
    op.alter_column('users', 'phone_number', new_column_name='phone')
    
    # 删除列
    op.drop_column('users', 'role')
    op.drop_column('users', 'bio')
    op.drop_column('users', 'phone')

6.5 多环境迁移配置

# migrations/env.py

from __future__ import with_statement
import logging
from logging.config import fileConfig
from flask import current_app
from alembic import context

config = context.config
fileConfig(config.config_file_name)
logger = logging.getLogger('alembic.env')

# 获取Flask应用的元数据
def get_metadata():
    if hasattr(current_app, 'extensions'):
        return current_app.extensions['migrate'].db.metadata
    return None

target_metadata = get_metadata()

def run_migrations_offline():
    """离线模式运行迁移"""
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        compare_type=True,  # 比较列类型变化
        compare_server_default=True,  # 比较默认值变化
    )

    with context.begin_transaction():
        context.run_migrations()


def run_migrations_online():
    """在线模式运行迁移"""
    def process_revision_directives(context, revision, directives):
        # 自动生成迁移时的自定义处理
        if config.cmd_opts.autogenerate:
            script = directives[0]
            if script.upgrade_ops.is_empty():
                directives[:] = []
                logger.info('No changes in schema detected.')

    connectable = current_app.extensions['migrate'].db.get_engine()

    with connectable.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=target_metadata,
            process_revision_directives=process_revision_directives,
            compare_type=True,
            compare_server_default=True,
            **current_app.extensions['migrate'].configure_args
        )

        with context.begin_transaction():
            context.run_migrations()


if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

6.6 迁移工作流程图

部署阶段

开发阶段

修改模型定义

flask db migrate

检测到变化?

生成迁移脚本

提示无变化

检查/编辑迁移脚本

flask db upgrade

测试迁移

测试通过?

flask db downgrade

提交代码

拉取代码

flask db upgrade

迁移成功?

启动应用

回滚迁移

排查问题

七、最佳实践

7.1 模型设计最佳实践

from datetime import datetime
from werkzeug.security import generate_password_hash, check_password_hash
import uuid

class User(db.Model):
    """用户模型最佳实践示例"""
    __tablename__ = 'users'
    
    # 主键:使用自增整数或UUID
    id = Column(Integer, primary_key=True)
    # 或使用UUID
    # id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    
    # 唯一标识字段
    username = Column(String(80), unique=True, nullable=False, index=True)
    email = Column(String(120), unique=True, nullable=False, index=True)
    
    # 密码:存储哈希值,不存储明文
    password_hash = Column(String(128), nullable=False)
    
    # 状态字段
    active = Column(Boolean, default=True, nullable=False)
    confirmed = Column(Boolean, default=False, nullable=False)
    
    # 时间戳
    created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    last_login_at = Column(DateTime)
    
    # 软删除
    deleted_at = Column(DateTime, nullable=True)
    
    # 个人信息
    name = Column(String(100))
    avatar = Column(String(200))
    bio = Column(Text)
    
    # 关系
    profile = relationship('UserProfile', backref='user', uselist=False, 
                          cascade='all, delete-orphan')
    posts = relationship('Post', backref='author', lazy='dynamic',
                        cascade='all, delete-orphan')
    
    # 约束
    __table_args__ = (
        CheckConstraint("email LIKE '%@%'", name='ck_email_format'),
        Index('idx_users_email_username', 'email', 'username'),
    )
    
    def set_password(self, password):
        """设置密码(存储哈希)"""
        self.password_hash = generate_password_hash(password)
    
    def check_password(self, password):
        """验证密码"""
        return check_password_hash(self.password_hash, password)
    
    def soft_delete(self):
        """软删除"""
        self.deleted_at = datetime.utcnow()
        db.session.commit()
    
    def restore(self):
        """恢复软删除"""
        self.deleted_at = None
        db.session.commit()
    
    @property
    def is_deleted(self):
        """是否已删除"""
        return self.deleted_at is not None
    
    @classmethod
    def active_users(cls):
        """查询活跃用户"""
        return cls.query.filter(cls.active == True, cls.deleted_at == None)
    
    def to_dict(self, include_sensitive=False):
        """序列化"""
        data = {
            'id': self.id,
            'username': self.username,
            'email': self.email if include_sensitive else None,
            'name': self.name,
            'avatar': self.avatar,
            'created_at': self.created_at.isoformat() if self.created_at else None,
        }
        return {k: v for k, v in data.items() if v is not None}
    
    def __repr__(self):
        return f'<User {self.username}>'


# 多对多关联表最佳实践
user_role = Table(
    'user_role',
    db.Model.metadata,
    Column('user_id', Integer, ForeignKey('users.id', ondelete='CASCADE'), 
           primary_key=True),
    Column('role_id', Integer, ForeignKey('roles.id', ondelete='CASCADE'), 
           primary_key=True),
    Column('assigned_at', DateTime, default=datetime.utcnow),
    Column('assigned_by', Integer, ForeignKey('users.id')),
    # 复合索引
    Index('idx_user_role', 'user_id', 'role_id'),
)


class Role(db.Model):
    """角色模型"""
    __tablename__ = 'roles'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(50), unique=True, nullable=False)
    description = Column(String(200))
    permissions = Column(JSON)  # 存储权限列表
    
    users = relationship('User', secondary=user_role, backref='roles')
    
    def has_permission(self, permission):
        """检查权限"""
        if self.permissions:
            return permission in self.permissions
        return False

7.2 查询优化最佳实践

from sqlalchemy.orm import lazyload, joinedload, subqueryload, load_only

class QueryOptimization:
    """查询优化示例"""
    
    @staticmethod
    def avoid_n_plus_one():
        """避免N+1查询问题"""
        # 错误方式(N+1问题)
        users = User.query.all()
        for user in users:
            print(user.profile.bio)  # 每次循环都发出一条SQL
        
        # 正确方式1:joinedload
        users = User.query.options(joinedload(User.profile)).all()
        for user in users:
            print(user.profile.bio)  # 不再发出额外SQL
        
        # 正确方式2:subqueryload
        users = User.query.options(subqueryload(User.posts)).all()
        
        # 正确方式3:在relationship中配置lazy='joined'
    
    @staticmethod
    def select_specific_columns():
        """只查询需要的列"""
        # 错误方式(查询所有列)
        users = User.query.all()
        
        # 正确方式:只查询需要的列
        users = User.query.with_entities(
            User.id, User.username, User.email
        ).all()
        
        # 使用load_only
        users = User.query.options(
            load_only(User.id, User.username, User.email)
        ).all()
    
    @staticmethod
    def use_indexed_columns():
        """使用索引列进行查询"""
        # 确保查询条件使用索引列
        user = User.query.filter_by(email='user@example.com').first()  # email有索引
        users = User.query.filter(User.username.like('j%')).all()  # username有索引
    
    @staticmethod
    def batch_operations():
        """批量操作优化"""
        # 批量插入
        users = [
            User(username=f'user{i}', email=f'user{i}@example.com',
                 password_hash='hash')
            for i in range(1000)
        ]
        
        # 方式1:add_all(触发ORM事件)
        db.session.add_all(users)
        db.session.commit()
        
        # 方式2:bulk_insert_mappings(不触发ORM事件,更快)
        db.session.bulk_insert_mappings(User, [
            {'username': f'user{i}', 'email': f'user{i}@example.com',
             'password_hash': 'hash'}
            for i in range(1000)
        ])
    
    @staticmethod
    def pagination_optimization():
        """分页优化"""
        # 大数据量时避免使用OFFSET
        # 使用seek method
        last_id = 0
        page_size = 100
        
        while True:
            users = User.query.filter(User.id > last_id)\
                .order_by(User.id)\
                .limit(page_size)\
                .all()
            
            if not users:
                break
            
            for user in users:
                process(user)
            
            last_id = users[-1].id
    
    @staticmethod
    def connection_pooling():
        """连接池配置"""
        app.config['SQLALCHEMY_POOL_SIZE'] = 20  # 连接池大小
        app.config['SQLALCHEMY_MAX_OVERFLOW'] = 10  # 最大溢出连接
        app.config['SQLALCHEMY_POOL_TIMEOUT'] = 30  # 获取连接超时
        app.config['SQLALCHEMY_POOL_RECYCLE'] = 3600  # 连接回收时间
        app.config['SQLALCHEMY_POOL_PRE_PING'] = True  # 连接前检查

7.3 事务管理最佳实践

from contextlib import contextmanager
from sqlalchemy.exc import IntegrityError, OperationalError

class TransactionBestPractices:
    """事务管理最佳实践"""
    
    @staticmethod
    @contextmanager
    def auto_commit():
        """自动提交上下文管理器"""
        try:
            yield
            db.session.commit()
        except Exception:
            db.session.rollback()
            raise
    
    @staticmethod
    def handle_integrity_error():
        """处理完整性错误"""
        try:
            user = User(username='existing_user', email='test@example.com')
            db.session.add(user)
            db.session.commit()
        except IntegrityError as e:
            db.session.rollback()
            if 'unique constraint' in str(e):
                return {'error': '用户名或邮箱已存在'}
            raise
    
    @staticmethod
    def use_savepoint():
        """使用保存点"""
        user1 = User(username='user1', email='user1@example.com')
        db.session.add(user1)
        
        savepoint = db.session.begin_nested()
        try:
            user2 = User(username='user2', email='user2@example.com')
            db.session.add(user2)
            # 某些可能失败的操作
            risky_operation()
            savepoint.commit()
        except Exception:
            savepoint.rollback()
            # user1仍然存在,user2被回滚
        
        db.session.commit()
    
    @staticmethod
    def session_lifecycle():
        """会话生命周期管理"""
        # 每个请求结束后自动移除会话
        @app.teardown_appcontext
        def shutdown_session(exception=None):
            db.session.remove()
        
        # 或使用Flask-SQLAlchemy的自动管理
        # Flask-SQLAlchemy会自动处理会话生命周期
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐