FastApi之Tortoise ORM使用
·
1. 异步 ORM 基础与架构
1.1 核心概念与设计理念
Tortoise ORM 是一个全异步的 Python ORM 框架,设计灵感来源于 Django ORM,但专为异步环境打造。它的核心设计理念是提供与 Django ORM 类似的 API,同时充分利用 Python 的 asyncio 库实现异步数据库操作。
核心组件:
- 模型系统:定义数据库表结构的对象模型
- 查询集:链式调用的查询构建系统
- 数据库后端:支持多种数据库的异步实现
- 连接池:管理数据库连接的高效机制
- 事务管理:支持异步事务操作
与其他 ORM 框架的对比:
| 特性 | Tortoise ORM | Django ORM | SQLAlchemy |
|---|---|---|---|
| 异步支持 | 原生支持 | 有限支持(3.1+) | 部分支持(2.0+) |
| API 风格 | Django 风格 | Django 风格 | 多种风格 |
| 数据库支持 | PostgreSQL, MySQL, SQLite | 多种数据库 | 多种数据库 |
| 学习曲线 | 低 | 低 | 中高 |
| 性能 | 高(异步) | 中(同步) | 中(部分异步) |
1.2 安装与环境配置
# 安装 Tortoise ORM
pip install tortoise-orm
# 安装数据库驱动
pip install asyncpg # PostgreSQL
pip install aiomysql # MySQL
pip install aiosqlite # SQLite
# 安装 FastAPI
pip install fastapi uvicorn
# 安装其他依赖
pip install pydantic # 数据验证
pip install aerich # 数据库迁移
pip install aiocache # 异步缓存
pip install aioredis # 异步 Redis 客户端
pip install celery # 异步任务队列
pip install pytest # 测试框架
pip install pytest-asyncio # 异步测试支持
pip install python-dotenv # 环境变量管理
1.3 异步特性原理
Tortoise ORM 的异步特性基于 Python 的 asyncio 库,通过以下机制实现:
- 异步数据库驱动:使用
asyncpg、aiomysql等异步驱动 - 协程操作:所有数据库操作都使用
await关键字 - 事件循环:利用
asyncio事件循环处理并发操作 - 非阻塞 I/O:数据库操作不会阻塞主线程
1.4 配置管理与参数详解
from tortoise import Tortoise
async def init_db() -> None:
"""初始化数据库连接"""
try:
await Tortoise.init(
db_url="postgres://user:password@localhost:5432/mydb",
modules={"models": ["app.models"]},
# 连接池配置
pool_size=20, # 连接池大小,根据并发量调整
max_overflow=10, # 最大溢出连接数
pool_recycle=3600, # 连接回收时间(秒),避免连接过期
pool_pre_ping=True, # 连接前ping数据库,确保连接有效
# 其他配置
echo=True, # 打印SQL语句
timeout=30 # 连接超时时间
)
# 创建表
await Tortoise.generate_schemas()
print("数据库初始化成功")
except Exception as e:
print(f"数据库初始化失败: {e}")
raise
# 运行初始化
import asyncio
asyncio.run(init_db())
多数据库支持:
from tortoise import Tortoise, connections
async def init_multiple_dbs() -> None:
"""初始化多个数据库连接"""
await Tortoise.init(
db_urls={
"default": "postgres://user:password@localhost",
"secondary": "mysql://user:password@localhost:3306/secondarydb"
},
modules={
"models": ["app.models"],
"secondary": ["app.secondary_models"]
}
)
# 创建表
await Tortoise.generate_schemas()
async def use_specific_database() -> None:
"""使用特定数据库执行操作"""
# 假设 User 模型已定义
from app.models import User
# 使用默认数据库
users = await User.all()
# 使用特定数据库
from app.secondary_models import Product
async with connections["secondary"].transaction() as conn:
# 在 secondary 数据库中执行操作
products = await Product.all(using_db=conn)
# 创建新产品
product = Product(name="New Product", price=99.99)
await product.save(using_db=conn)
async def cross_database_query() -> None:
"""跨数据库查询示例"""
# 从默认数据库获取用户
from app.models import User
users = await User.all()
# 从 secondary 数据库获取产品
from app.secondary_models import Product
products = await Product.all()
# 处理跨数据库数据
for user in users:
print(f"User: {user.name}")
for product in products:
print(f"Product: {product.name}, Price: {product.price}")
# 运行初始化
import asyncio
asyncio.run(init_multiple_dbs())
asyncio.run(use_specific_database())
asyncio.run(cross_database_query())
2. 模型定义与字段系统
2.1 模型创建与继承
from tortoise import fields
from tortoise.models import Model
class BaseModel(Model):
id = fields.IntField(pk=True)
created_at = fields.DatetimeField(auto_now_add=True)
updated_at = fields.DatetimeField(auto_now=True)
class Meta:
abstract = True
class User(BaseModel):
name = fields.CharField(max_length=50)
email = fields.CharField(max_length=100, unique=True)
is_active = fields.BooleanField(default=True)
class Meta:
table = "users"
indexes = [
("name", "email"), # 复合索引
]
2.2 字段类型与参数详解
| 字段类型 | 描述 | 常用参数 |
|---|---|---|
| IntField | 整数 | pk, default, null, unique, index |
| CharField | 字符串 | max_length, default, null, unique, index |
| TextField | 长文本 | default, null |
| BooleanField | 布尔值 | default, null |
| DatetimeField | 日期时间 | auto_now, auto_now_add, default, null |
| FloatField | 浮点数 | default, null |
| ForeignKeyField | 外键 | related_model, related_name, on_delete, null |
| ManyToManyField | 多对多 | related_model, related_name, through |
| JSONField | JSON数据 | default, null |
| UUIDField | UUID | pk, default, unique, index |
| DateField | 日期 | auto_now, auto_now_add, default, null |
| TimeField | 时间 | auto_now, auto_now_add, default, null |
2.3 索引与约束优化
from tortoise import fields
from tortoise.models import Model
class User(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=50, index=True)
email = fields.CharField(max_length=100, unique=True, index=True)
age = fields.IntField(index=True)
created_at = fields.DatetimeField(auto_now_add=True)
class Meta:
table = "users"
indexes = [
("name", "email"), # 复合索引
("age", "created_at"), # 复合索引
]
unique_together = [
("name", "email"), # 联合唯一约束
]
2.4 模型继承高级特性
from tortoise import fields
from tortoise.models import Model
# 基础模型
class BaseModel(Model):
id = fields.IntField(pk=True)
created_at = fields.DatetimeField(auto_now_add=True)
updated_at = fields.DatetimeField(auto_now=True)
class Meta:
abstract = True
# 单表继承
class Person(BaseModel):
name = fields.CharField(max_length=50)
type = fields.CharField(max_length=20)
class Meta:
abstract = True
class Employee(Person):
employee_id = fields.CharField(max_length=20)
department = fields.CharField(max_length=50)
class Customer(Person):
customer_id = fields.CharField(max_length=20)
balance = fields.FloatField(default=0.0)
3. 查询构建与数据操作
3.1 基础查询操作
from tortoise import fields
from tortoise.models import Model
from datetime import datetime
# 完整的 User 模型定义
class User(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=50)
email = fields.CharField(max_length=100, unique=True)
age = fields.IntField(null=True)
is_active = fields.BooleanField(default=True)
created_at = fields.DatetimeField(auto_now_add=True)
updated_at = fields.DatetimeField(auto_now=True)
class Meta:
table = "users"
async def basic_queries():
"""基础查询示例"""
# 获取所有用户
users = await User.all()
# 根据ID获取用户
user = await User.get(id=1)
# 根据ID获取用户,如果不存在返回None
user = await User.get_or_none(id=1)
# 过滤查询
active_users = await User.filter(is_active=True)
users_with_name = await User.filter(name__contains="Alice")
users_with_age = await User.filter(age__gte=18, age__lte=30)
# 排序
sorted_users = await User.order_by("-created_at")
# 分页
paginated_users = await User.offset(10).limit(20)
# 字段选择
users = await User.only("id", "name", "email")
users = await User.exclude("password_hash")
# 运行示例
import asyncio
asyncio.run(basic_queries())
3.2 关系查询与预加载
from tortoise import fields
from tortoise.models import Model
class User(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=50)
profile = fields.OneToOneField("models.Profile", related_name="user", null=True)
posts = fields.ReverseRelation["Post"]
class Profile(Model):
id = fields.IntField(pk=True)
bio = fields.TextField()
user = fields.OneToOneField("models.User", related_name="profile")
class Post(Model):
id = fields.IntField(pk=True)
title = fields.CharField(max_length=100)
content = fields.TextField()
user = fields.ForeignKeyField("models.User", related_name="posts")
tags = fields.ManyToManyField("models.Tag", related_name="posts")
class Tag(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=50)
posts = fields.ManyToManyField("models.Post", related_name="tags")
async def relation_queries():
"""关系查询示例"""
try:
# 预加载关系
users_with_posts = await User.all().prefetch_related("posts")
print(f"预加载用户帖子成功,获取了 {len(users_with_posts)} 个用户")
posts_with_user_and_tags = await Post.all().prefetch_related("user", "tags")
print(f"预加载帖子用户和标签成功,获取了 {len(posts_with_user_and_tags)} 个帖子")
# 嵌套预加载
users_with_posts_and_tags = await User.all().prefetch_related("posts__tags")
print(f"嵌套预加载成功,获取了 {len(users_with_posts_and_tags)} 个用户")
except Exception as e:
print(f"关系查询失败: {e}")
raise
# 运行示例
import asyncio
asyncio.run(relation_queries())
3.3 复杂查询与聚合函数
from tortoise.functions import Count, Sum, Avg, Max, Min, Window
from tortoise import Subquery
from tortoise.query_utils import Q
from tortoise.expressions import RowNumber
async def complex_queries():
"""复杂查询示例"""
# 假设模型已定义
from app.models import User, Post, Tag
# 聚合查询
user_count = await User.filter(is_active=True).count()
post_count = await Post.filter(user_id=1).count()
avg_age = await User.all().annotate(avg_age=Avg("age")).first()
# 分组查询
tag_post_counts = await Tag.annotate(post_count=Count("posts")).order_by("-post_count")
# 子查询
subquery = Post.filter(user_id=1).values("id")
posts = await Post.filter(id__in=Subquery(subquery))
# Q 对象查询
users = await User.filter(Q(name__contains="Alice") | Q(email__contains="alice"))
users = await User.filter(Q(age__gte=18) & Q(is_active=True))
async def advanced_queries():
"""高级查询示例"""
# 假设模型已定义
from app.models import Post
# 窗口函数
posts = await Post.annotate(
row_number=Window(
function=RowNumber(),
partition_by=["user_id"],
order_by=["-created_at"]
)
).all()
# 复杂子查询
# 获取每个用户的最新帖子
subquery = Post.annotate(
row_number=Window(
function=RowNumber(),
partition_by=["user_id"],
order_by=["-created_at"]
)
).filter(row_number=1).values("id")
latest_posts = await Post.filter(id__in=Subquery(subquery))
# 运行示例
import asyncio
asyncio.run(complex_queries())
asyncio.run(advanced_queries())
3.4 批量操作与性能优化
from tortoise import fields
from tortoise.models import Model
from typing import List
from datetime import datetime
# 完整的 User 模型定义
class User(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=50)
email = fields.CharField(max_length=100, unique=True)
age = fields.IntField(null=True)
is_active = fields.BooleanField(default=True)
created_at = fields.DatetimeField(auto_now_add=True)
updated_at = fields.DatetimeField(auto_now=True)
class Meta:
table = "users"
async def batch_operations():
"""批量操作示例"""
try:
# 批量创建
users: List[User] = [
User(name=f"User {i}", email=f"user{i}@example.com")
for i in range(100)
]
await User.bulk_create(users, batch_size=50) # 每批处理50个
print("批量创建成功")
# 批量更新
updated_count = await User.filter(is_active=True).update(name="Active User")
print(f"批量更新成功,更新了 {updated_count} 条记录")
# 批量删除
deleted_count = await User.filter(age__lt=18).delete()
print(f"批量删除成功,删除了 {deleted_count} 条记录")
# 原生 SQL 查询
users = await User.raw("SELECT * FROM users WHERE age > ?", [18])
print(f"原生 SQL 查询成功,获取了 {len(users)} 条记录")
except Exception as e:
print(f"批量操作失败: {e}")
raise
# 运行示例
import asyncio
asyncio.run(batch_operations())
4. 数据验证与序列化
4.1 Pydantic 集成
from pydantic import BaseModel, EmailStr, Field
from typing import Optional
from datetime import datetime
class UserBase(BaseModel):
name: str = Field(..., min_length=2, max_length=50)
email: EmailStr
class UserCreate(UserBase):
password: str = Field(..., min_length=6)
class UserUpdate(BaseModel):
name: Optional[str] = Field(None, min_length=2, max_length=50)
email: Optional[EmailStr] = None
is_active: Optional[bool] = None
class UserResponse(UserBase):
id: int
is_active: bool
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
4.2 数据验证机制
from tortoise import fields
from tortoise.models import Model
class User(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=50)
email = fields.CharField(max_length=100, unique=True)
password_hash = fields.CharField(max_length=255) # 存储哈希后的密码
age = fields.IntField(null=True)
async def clean(self):
"""自定义验证逻辑"""
if self.age is not None and (self.age < 0 or self.age > 120):
raise ValueError("Age must be between 0 and 120")
if len(self.name) < 2:
raise ValueError("Name must be at least 2 characters")
async def set_password(self, password: str):
"""设置密码,自动进行哈希处理"""
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
self.password_hash = pwd_context.hash(password)
async def verify_password(self, password: str) -> bool:
"""验证密码"""
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
return pwd_context.verify(password, self.password_hash)
async def validation_example():
"""验证示例"""
user = User(name="A", email="a@example.com", age=150)
try:
await user.save()
except ValueError as e:
print(f"Validation error: {e}")
# 运行示例
import asyncio
asyncio.run(validation_example())
4.3 序列化与反序列化
from tortoise import fields
from tortoise.models import Model
from pydantic import BaseModel, EmailStr, Field
from typing import Optional
from datetime import datetime
# 完整的 User 模型定义
class User(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=50)
email = fields.CharField(max_length=100, unique=True)
is_active = fields.BooleanField(default=True)
created_at = fields.DatetimeField(auto_now_add=True)
updated_at = fields.DatetimeField(auto_now=True)
class Meta:
table = "users"
# Pydantic 模型
class UserBase(BaseModel):
name: str = Field(..., min_length=2, max_length=50)
email: EmailStr
class UserCreate(UserBase):
password: str = Field(..., min_length=6)
class UserUpdate(BaseModel):
name: Optional[str] = Field(None, min_length=2, max_length=50)
email: Optional[EmailStr] = None
is_active: Optional[bool] = None
class UserResponse(UserBase):
id: int
is_active: bool
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
async def serialization_example():
"""序列化示例"""
# 模型转 Pydantic
user = await User.get(id=1)
user_response = UserResponse.model_validate(user)
# Pydantic 转模型
user_data = UserCreate(name="Alice", email="alice@example.com", password="secret")
user = User(name=user_data.name, email=user_data.email)
await user.save()
# 批量序列化
users = await User.all()
user_responses = [UserResponse.model_validate(user) for user in users]
# 运行示例
import asyncio
asyncio.run(serialization_example())
4.4 错误处理与异常捕获
from tortoise import fields
from tortoise.models import Model
from tortoise.exceptions import DoesNotExist, IntegrityError
from fastapi import HTTPException
from pydantic import BaseModel, EmailStr, Field
from typing import Optional
from datetime import datetime
# 完整的 User 模型定义
class User(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=50)
email = fields.CharField(max_length=100, unique=True)
is_active = fields.BooleanField(default=True)
created_at = fields.DatetimeField(auto_now_add=True)
updated_at = fields.DatetimeField(auto_now=True)
class Meta:
table = "users"
# Pydantic 模型
class UserBase(BaseModel):
name: str = Field(..., min_length=2, max_length=50)
email: EmailStr
class UserCreate(UserBase):
password: str = Field(..., min_length=6)
class UserUpdate(BaseModel):
name: Optional[str] = Field(None, min_length=2, max_length=50)
email: Optional[EmailStr] = None
is_active: Optional[bool] = None
class UserResponse(UserBase):
id: int
is_active: bool
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
async def get_user(user_id: int):
"""根据ID获取用户"""
try:
user = await User.get(id=user_id)
return user
except DoesNotExist:
raise HTTPException(status_code=404, detail="User not found")
async def create_user(user_data: UserCreate):
"""创建新用户"""
try:
user = User(name=user_data.name, email=user_data.email)
await user.set_password(user_data.password) # 设置密码,自动哈希
await user.save()
return user
except IntegrityError:
raise HTTPException(status_code=400, detail="Email already registered")
except ValueError as e:
raise HTTPException(status_code=422, detail=str(e))
5. FastAPI 集成
5.1 依赖注入设计
from fastapi import FastAPI, Depends
from tortoise import Tortoise
from typing import Generator
app = FastAPI()
async def get_db() -> Generator[None, None, None]:
"""获取数据库连接"""
if not Tortoise._inited:
await Tortoise.init(
db_url="postgres://user:password@localhost:5432/mydb",
modules={"models": ["app.models"]}
)
yield
await Tortoise.close_connections()
@app.get("/users")
async def get_users(db: None = Depends(get_db)):
"""获取用户列表"""
from app.models import User
users = await User.all()
return users
5.2 API 路由与模型绑定
from fastapi import APIRouter, HTTPException
from tortoise import fields
from tortoise.models import Model
from pydantic import BaseModel, EmailStr, Field
from typing import Optional
from datetime import datetime
# 完整的 User 模型定义
class User(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=50)
email = fields.CharField(max_length=100, unique=True)
is_active = fields.BooleanField(default=True)
created_at = fields.DatetimeField(auto_now_add=True)
updated_at = fields.DatetimeField(auto_now=True)
class Meta:
table = "users"
# Pydantic 模型
class UserBase(BaseModel):
name: str = Field(..., min_length=2, max_length=50)
email: EmailStr
class UserCreate(UserBase):
password: str = Field(..., min_length=6)
class UserUpdate(BaseModel):
name: Optional[str] = Field(None, min_length=2, max_length=50)
email: Optional[EmailStr] = None
is_active: Optional[bool] = None
class UserResponse(UserBase):
id: int
is_active: bool
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
# 假设已经创建了 FastAPI 应用实例
from fastapi import FastAPI
app = FastAPI()
router = APIRouter(prefix="/users", tags=["users"])
@router.post("/", response_model=UserResponse)
async def create_user(user: UserCreate):
"""创建新用户"""
existing_user = await User.filter(email=user.email).first()
if existing_user:
raise HTTPException(status_code=400, detail="Email already registered")
db_user = User(name=user.name, email=user.email)
await db_user.set_password(user.password) # 设置密码,自动哈希
await db_user.save()
return UserResponse.model_validate(db_user)
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
"""获取用户详情"""
user = await User.get_or_none(id=user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return UserResponse.model_validate(user)
@router.put("/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user: UserUpdate):
"""更新用户信息"""
db_user = await User.get_or_none(id=user_id)
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
update_data = user.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_user, field, value)
await db_user.save()
return UserResponse.model_validate(db_user)
@router.delete("/{user_id}")
async def delete_user(user_id: int):
"""删除用户"""
db_user = await User.get_or_none(id=user_id)
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
await db_user.delete()
return {"message": "User deleted"}
# 注册路由
app.include_router(router)
5.3 集成最佳实践
5.3.1 项目结构示例
my_project/
├── app/
│ ├── __init__.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── base.py # 基础模型
│ │ ├── user.py # 用户模型
│ │ └── post.py # 帖子模型
│ ├── schemas/
│ │ ├── __init__.py
│ │ ├── user.py # 用户相关 Pydantic 模型
│ │ └── post.py # 帖子相关 Pydantic 模型
│ ├── api/
│ │ ├── __init__.py
│ │ ├── user.py # 用户相关 API 路由
│ │ └── post.py # 帖子相关 API 路由
│ ├── services/
│ │ ├── __init__.py
│ │ ├── user_service.py # 用户业务逻辑
│ │ └── post_service.py # 帖子业务逻辑
│ ├── dependencies.py # 依赖注入
│ └── config.py # 配置管理
├── main.py # FastAPI 应用入口
├── requirements.txt # 依赖管理
├── aerich.ini # Aerich 配置
└── migrations/ # 数据库迁移文件
5.3.2 完整的 FastAPI 集成示例
# app/config.py
TORTOISE_ORM = {
"connections": {
"default": "postgres://user:password@localhost:5432/mydb"
},
"apps": {
"models": {
"models": ["app.models", "aerich.models"],
"default_connection": "default",
},
},
}
# app/models/base.py
from tortoise import fields
from tortoise.models import Model
class BaseModel(Model):
id = fields.IntField(pk=True)
created_at = fields.DatetimeField(auto_now_add=True)
updated_at = fields.DatetimeField(auto_now=True)
class Meta:
abstract = True
# app/models/user.py
from tortoise import fields
from app.models.base import BaseModel
class User(BaseModel):
name = fields.CharField(max_length=50)
email = fields.CharField(max_length=100, unique=True)
password_hash = fields.CharField(max_length=255) # 存储哈希后的密码
is_active = fields.BooleanField(default=True)
posts = fields.ReverseRelation["Post"]
async def set_password(self, password: str):
"""设置密码,自动进行哈希处理"""
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
self.password_hash = pwd_context.hash(password)
async def verify_password(self, password: str) -> bool:
"""验证密码"""
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
return pwd_context.verify(password, self.password_hash)
# app/models/post.py
from tortoise import fields
from app.models.base import BaseModel
class Post(BaseModel):
title = fields.CharField(max_length=100)
content = fields.TextField()
user = fields.ForeignKeyField("models.User", related_name="posts")
# app/schemas/user.py
from pydantic import BaseModel, EmailStr, Field
from typing import Optional
from datetime import datetime
class UserBase(BaseModel):
name: str = Field(..., min_length=2, max_length=50)
email: EmailStr
class UserCreate(UserBase):
password: str = Field(..., min_length=6)
class UserUpdate(BaseModel):
name: Optional[str] = Field(None, min_length=2, max_length=50)
email: Optional[EmailStr] = None
is_active: Optional[bool] = None
class UserResponse(UserBase):
id: int
is_active: bool
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
# app/dependencies.py
from tortoise import Tortoise
from typing import Generator
async def get_db() -> Generator[None, None, None]:
"""获取数据库连接"""
if not Tortoise._inited:
from app.config import TORTOISE_ORM
await Tortoise.init(config=TORTOISE_ORM)
yield
await Tortoise.close_connections()
# app/api/user.py
from fastapi import APIRouter, HTTPException, Depends
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate, UserResponse
from app.dependencies import get_db
from typing import List
router = APIRouter(prefix="/users", tags=["users"])
@router.post("/", response_model=UserResponse)
async def create_user(user: UserCreate, db=None):
"""创建新用户"""
existing_user = await User.filter(email=user.email).first()
if existing_user:
raise HTTPException(status_code=400, detail="Email already registered")
db_user = User(name=user.name, email=user.email)
await db_user.save()
return UserResponse.model_validate(db_user)
@router.get("/", response_model=List[UserResponse])
async def get_users(db=None):
"""获取用户列表"""
users = await User.all()
return [UserResponse.model_validate(user) for user in users]
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(user_id: int, db=None):
"""获取用户详情"""
user = await User.get_or_none(id=user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return UserResponse.model_validate(user)
@router.put("/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user: UserUpdate, db=None):
"""更新用户信息"""
db_user = await User.get_or_none(id=user_id)
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
update_data = user.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_user, field, value)
await db_user.save()
return UserResponse.model_validate(db_user)
@router.delete("/{user_id}")
async def delete_user(user_id: int, db=None):
"""删除用户"""
db_user = await User.get_or_none(id=user_id)
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
await db_user.delete()
return {"message": "User deleted"}
# main.py
from fastapi import FastAPI
from tortoise import Tortoise
from app.config import TORTOISE_ORM
from app.api import user
app = FastAPI()
# 数据库初始化
@app.on_event("startup")
async def startup():
"""启动时初始化数据库连接"""
await Tortoise.init(config=TORTOISE_ORM)
await Tortoise.generate_schemas()
@app.on_event("shutdown")
async def shutdown():
"""关闭时关闭数据库连接"""
await Tortoise.close_connections()
# 注册路由
app.include_router(user.router)
6. 事务管理与并发控制
6.1 事务操作与回滚
from tortoise.transactions import in_transaction
async def create_user_with_post():
"""创建用户和帖子,使用事务确保原子性"""
# 假设模型已定义
from app.models import User, Post
async with in_transaction() as conn:
try:
# 创建用户
user = User(name="Alice", email="alice@example.com")
await user.save(using_db=conn)
# 创建帖子
post = Post(title="Hello", content="World", user=user)
await post.save(using_db=conn)
# 提交事务
except Exception as e:
# 自动回滚
raise e
# 运行示例
import asyncio
asyncio.run(create_user_with_post())
6.2 异步事务处理
async def transfer_money(from_user_id: int, to_user_id: int, amount: float):
"""转账操作,使用事务确保原子性"""
# 假设模型已定义
from app.models import User
async with in_transaction() as conn:
# 获取用户
from_user = await User.get(id=from_user_id, using_db=conn)
to_user = await User.get(id=to_user_id, using_db=conn)
# 检查余额
if from_user.balance < amount:
raise ValueError("Insufficient balance")
# 执行转账
from_user.balance -= amount
to_user.balance += amount
# 保存更改
await from_user.save(using_db=conn)
await to_user.save(using_db=conn)
6.3 并发控制策略
from tortoise import fields
from tortoise.models import Model
# 乐观锁实现
class Product(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=100)
price = fields.FloatField()
version = fields.IntField(default=0)
async def save(self, *args, **kwargs):
"""保存时自动增加版本号"""
self.version += 1
if 'update_fields' in kwargs:
kwargs['update_fields'].append('version')
await super().save(*args, **kwargs)
async def update_product(product_id: int, new_price: float):
"""使用乐观锁更新产品价格"""
max_retries = 3
for attempt in range(max_retries):
product = await Product.get(id=product_id)
original_version = product.version
product.price = new_price
try:
# 尝试保存,使用 where 条件确保版本号匹配
await product.save(update_fields=['price', 'version'], force_update=True)
print("更新成功")
return True
except Exception as e:
print(f"更新失败,可能是并发冲突: {e}")
if attempt < max_retries - 1:
print(f"重试 {attempt + 1}/{max_retries}")
continue
return False
6.4 事务参数配置
async def transaction_with_options():
"""使用自定义参数的事务操作"""
# 自定义事务参数
async with in_transaction(
connection_name="default", # 数据库连接名称
isolation_level="READ COMMITTED" # 隔离级别
) as conn:
# 执行操作
pass
# 事务隔离级别
async def transaction_with_isolation():
"""使用特定隔离级别的事务操作"""
# 设置隔离级别
async with in_transaction(isolation_level="SERIALIZABLE") as conn:
# 执行操作
pass
7. 性能优化与最佳实践
7.1 查询优化技巧
async def query_optimization():
"""查询优化示例"""
# 假设模型已定义
from app.models import User, Post
# 预加载关系
users = await User.all().prefetch_related("posts", "profile")
# 仅选择需要的字段
users = await User.all().only("id", "name", "email")
# 避免 N+1 查询
posts = await Post.all().prefetch_related("user")
# 使用 select_related(对于外键)
posts = await Post.all().select_related("user")
# 批量查询
users = await User.filter(id__in=[1, 2, 3, 4, 5])
7.2 批量操作优化
async def batch_optimization():
"""批量操作优化示例"""
from app.models import User
# 批量创建优化
async def batch_create_users():
"""批量创建用户,使用合适的批处理大小"""
users = [
User(name=f"User {i}", email=f"user{i}@example.com")
for i in range(1000)
]
# 分批处理,每批50个
batch_size = 50
for i in range(0, len(users), batch_size):
batch = users[i:i + batch_size]
await User.bulk_create(batch, batch_size=batch_size)
print("批量创建完成")
# 批量更新优化
async def batch_update_users():
"""批量更新用户状态"""
# 使用 bulk_update 进行批量更新
users = await User.filter(is_active=True).limit(100)
for user in users:
user.name = f"Updated {user.name}"
await User.bulk_update(users, fields=["name"])
print("批量更新完成")
await batch_create_users()
await batch_update_users()
7.3 连接池配置与优化
async def configure_connection_pool():
"""配置连接池"""
try:
await Tortoise.init(
db_url="postgres://user:password@localhost:5432/mydb",
modules={"models": ["app.models"]},
# 连接池配置
pool_size=20, # 连接池大小,根据并发量调整
max_overflow=10, # 最大溢出连接数
pool_recycle=3600, # 连接回收时间,避免连接过期
pool_pre_ping=True, # 连接前ping数据库,确保连接有效
timeout=30 # 连接超时时间
)
await Tortoise.generate_schemas()
print("连接池配置成功")
except Exception as e:
print(f"连接池配置失败: {e}")
raise
7.4 缓存策略
from aiocache import Cache
# 创建内存缓存实例
cache = Cache(Cache.MEMORY)
async def get_user_cache(user_id: int):
"""从缓存获取用户信息"""
# 假设模型已定义
from app.models import User
# 尝试从缓存获取
cached_user = await cache.get(f"user:{user_id}")
if cached_user:
return cached_user
# 从数据库获取
user = await User.get(id=user_id)
# 存入缓存
await cache.set(f"user:{user_id}", user, ttl=3600)
return user
# 缓存失效
async def invalidate_user_cache(user_id: int):
"""使指定用户的缓存失效"""
await cache.delete(f"user:{user_id}")
7.5 缓存策略最佳实践
from aiocache import Cache, cached
from aiocache.serializers import PickleSerializer
# 创建Redis缓存实例
redis_cache = Cache(
Cache.REDIS,
endpoint="localhost",
port=6379,
db=0,
serializer=PickleSerializer()
)
# 使用装饰器缓存
@cached(
ttl=3600,
cache=redis_cache,
key_builder=lambda f, *args, **kwargs: f"user:{args[0]}"
)
async def get_user_with_cache(user_id: int):
"""使用装饰器缓存用户信息"""
from app.models import User
user = await User.get(id=user_id)
return user
# 缓存批量查询
async def get_users_in_bulk(user_ids: list):
"""批量获取用户,使用缓存优化"""
from app.models import User
users = []
missing_ids = []
# 先从缓存获取
for user_id in user_ids:
cached_user = await redis_cache.get(f"user:{user_id}")
if cached_user:
users.append(cached_user)
else:
missing_ids.append(user_id)
# 从数据库获取缺失的用户
if missing_ids:
db_users = await User.filter(id__in=missing_ids)
# 存入缓存
for user in db_users:
await redis_cache.set(f"user:{user.id}", user, ttl=3600)
users.extend(db_users)
return users
7.6 数据库索引设计
from tortoise import fields
from tortoise.models import Model
# 单列索引
class User(Model):
id = fields.IntField(pk=True) # 自动创建索引
name = fields.CharField(max_length=50, index=True) # 显式创建索引
# 复合索引
class UserWithCompositeIndex(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=50)
email = fields.CharField(max_length=100)
class Meta:
indexes = [
("name", "email"), # 复合索引
]
# 唯一索引
class UserWithUniqueIndex(Model):
id = fields.IntField(pk=True)
email = fields.CharField(max_length=100, unique=True) # 唯一索引
# 部分索引(PostgreSQL)
class UserWithPartialIndex(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=50)
is_active = fields.BooleanField(default=True)
class Meta:
indexes = [
("name",),
{
"fields": ["name"],
"where": "is_active = true",
"name": "idx_active_users_name"
}
]
8. 数据库迁移与 Aerich
8.1 Aerich 安装与配置
# 安装 Aerich
pip install aerich
# 初始化
aerich init -t app.config.TORTOISE_ORM
# 初始化数据库
aerich init-db
配置示例:
# app/config.py
TORTOISE_ORM = {
"connections": {
"default": "postgres://user:password@localhost:5432/mydb"
},
"apps": {
"models": {
"models": ["app.models", "aerich.models"],
"default_connection": "default",
},
},
}
8.2 迁移脚本生成与执行
# 生成迁移
aerich migrate --name initial
# 执行迁移
aerich upgrade
# 回滚迁移
aerich downgrade
# 查看迁移历史
aerich history
# 查看当前版本
aerich current
8.3 迁移策略与最佳实践
常见迁移问题与解决方案
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 迁移失败 | 数据库连接问题 | 检查连接字符串和数据库状态 |
| 字段类型不兼容 | 类型转换问题 | 手动修改迁移脚本,添加类型转换逻辑 |
| 唯一约束冲突 | 数据重复 | 清理重复数据后再执行迁移 |
| 外键约束错误 | 引用关系问题 | 确保引用的表和字段存在 |
9. 高级特性与扩展
9.1 事件系统
from tortoise.signals import post_save, pre_delete, post_delete, pre_save
@post_save(User)
async def user_created(sender, instance, created, using_db, update_fields):
"""用户保存后触发的事件"""
if created:
# 用户创建后执行操作
print(f"User {instance.name} created")
else:
# 用户更新后执行操作
print(f"User {instance.name} updated")
@pre_delete(User)
async def user_deleted(sender, instance, using_db):
"""用户删除前触发的事件"""
# 用户删除前执行操作
print(f"User {instance.name} will be deleted")
@post_delete(User)
async def user_deleted(sender, instance, using_db):
"""用户删除后触发的事件"""
# 用户删除后执行操作
print(f"User {instance.name} deleted")
@pre_save(User)
async def user_saved(sender, instance, using_db, update_fields):
"""用户保存前触发的事件"""
# 用户保存前执行操作
print(f"User {instance.name} will be saved")
9.2 自定义字段
from tortoise.fields import Field
import json
class CustomJSONField(Field):
"""自定义 JSON 字段,用于处理复杂的 JSON 数据"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
def to_db_value(self, value, instance):
"""将 Python 值转换为数据库值"""
if value is None:
return None
return json.dumps(value)
def to_python_value(self, value):
"""将数据库值转换为 Python 值"""
if value is None:
return None
return json.loads(value)
class User(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=50)
preferences = CustomJSONField(default={})
9.3 扩展开发
# 创建基础模型类
class TimestampedModel(Model):
id = fields.IntField(pk=True)
created_at = fields.DatetimeField(auto_now_add=True)
updated_at = fields.DatetimeField(auto_now=True)
class Meta:
abstract = True
# 创建扩展功能
class SoftDeleteModel(TimestampedModel):
is_deleted = fields.BooleanField(default=False)
async def delete(self, using_db=None):
"""软删除方法"""
# 软删除
self.is_deleted = True
await self.save(using_db=using_db)
@classmethod
async def all(cls, *args, **kwargs):
"""获取所有未删除的记录"""
# 只返回未删除的记录
return await super().filter(is_deleted=False, *args, **kwargs)
# 使用扩展
class User(SoftDeleteModel):
name = fields.CharField(max_length=50)
email = fields.CharField(max_length=100, unique=True)
9.4 第三方集成
import json
import aioredis
from celery import Celery
# 集成 Redis 缓存(异步)
redis_pool = None
async def init_redis():
"""初始化 Redis 连接池"""
global redis_pool
redis_pool = await aioredis.from_url(
"redis://localhost:6379/0",
encoding="utf-8",
decode_responses=True
)
async def get_user_from_cache(user_id: int):
"""从缓存获取用户信息"""
# 假设模型已定义
from app.models import User
if not redis_pool:
await init_redis()
# 尝试从 Redis 获取
user_data = await redis_pool.get(f"user:{user_id}")
if user_data:
return json.loads(user_data)
# 从数据库获取
user = await User.get(id=user_id)
# 存入 Redis
user_dict = {
"id": user.id,
"name": user.name,
"email": user.email
}
await redis_pool.set(f"user:{user_id}", json.dumps(user_dict), ex=3600)
return user_dict
# 集成 Celery 异步任务
celery = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@celery.task
def send_email(user_id: int, subject: str, message: str):
"""发送邮件的异步任务"""
# 发送邮件的逻辑
pass
10. 常见问题与解决方案
10.1 易错点分析
1.忘记使用 await
- 错误:user.save()
- 正确:await user.save()
2.循环导入
- 错误:直接导入模型
- 正确:使用字符串引用模型,如 "models.User"
3.查询集链式调用
- 错误:User.filter().all() 后再添加条件
- 正确:在 all() 前完成所有过滤
4.关系查询
- 错误:直接访问关系属性而不预加载
- 正确:使用 prefetch_related() 预加载关系
5.事务管理
- 错误:手动管理事务而不使用 in_transaction()
- 正确:使用 async with in_transaction() as conn:
10.2 疑难点解析
- 关系查询深度
- 问题:深层关系查询性能差
- 解决方案:使用
prefetch_related()控制加载深度,避免加载不必要的关系
- 事务隔离级别
- 问题:事务并发冲突
- 解决方案:根据业务需求选择合适的隔离级别,使用乐观锁或悲观锁
- 数据库迁移
- 问题:迁移失败或数据丢失
- 解决方案:使用 Aerich 管理迁移,迁移前备份数据库,测试迁移脚本
- 性能优化
- 问题:查询速度慢
- 解决方案:使用索引,优化查询,合理使用缓存
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)