HoRain云--FastAPI Pydantic 模型

🎬 HoRain云小助手:个人主页
🔥 个人专栏: 《Linux 系列教程》《c语言教程》
⛺️生活的理想,就是为了理想的生活!
⛳️ 推荐
前些天发现了一个超棒的服务器购买网站,性价比超高,大内存超划算!忍不住分享一下给大家。点击跳转到网站。
专栏介绍
|
专栏名称 |
专栏介绍 |
|
本专栏主要撰写C干货内容和编程技巧,让大家从底层了解C,把更多的知识由抽象到简单通俗易懂。 |
|
|
本专栏主要是注重从底层来给大家一步步剖析网络协议的奥秘,一起解密网络协议在运行中协议的基本运行机制! |
|
|
全面深入解析 docker 容器,从基础到进阶,涵盖原理、操作、实践案例,助您精通 docker。 |
|
|
本专栏主要撰写Linux干货内容,从基础到进阶,知识由抽象到简单通俗易懂,帮你从新手小白到扫地僧。 |
|
|
本专栏着重撰写Python相关的干货内容与编程技巧,助力大家从底层去认识Python,将更多复杂的知识由抽象转化为简单易懂的内容。 |
|
|
本专栏主要是发布一些考试和练习题库(涵盖软考、HCIE、HRCE、CCNA等) |
目录

🚀 FastAPI Pydantic 模型完全指南
Pydantic 是 FastAPI 的核心组件,用于数据验证、序列化和文档生成。它通过 Python 类型注解提供强大的数据验证功能。
📋 目录
🎯 Pydantic 基础概念
什么是 Pydantic?
Pydantic 是一个基于 Python 类型注解的数据验证和设置管理库,主要功能包括:
-
数据验证:自动验证输入数据的类型和约束
-
数据序列化:将 Python 对象转换为字典/JSON
-
数据解析:将原始数据解析为 Python 对象
-
文档生成:自动生成 API 文档
安装 Pydantic
pip install pydantic
# 或与 FastAPI 一起安装
pip install fastapi[all]
📝 基本模型定义
1. 创建基础模型
from pydantic import BaseModel
from typing import Optional, List, Dict
from datetime import datetime, date
from enum import Enum
# 基础用户模型
class User(BaseModel):
id: int
username: str
email: str
age: Optional[int] = None # 可选字段,默认值为 None
is_active: bool = True # 默认值为 True
created_at: datetime = datetime.now() # 默认值为当前时间
class Config:
# Pydantic 配置
from_attributes = True # 允许从 ORM 对象转换
json_encoders = {
datetime: lambda v: v.isoformat() # 自定义 JSON 编码
}
# 使用示例
user_data = {
"id": 1,
"username": "john_doe",
"email": "john@example.com",
"age": 25
}
# 创建 Pydantic 模型实例
user = User(**user_data)
print(user.id) # 1
print(user.username) # "john_doe"
print(user.model_dump()) # 转换为字典
print(user.model_dump_json()) # 转换为 JSON 字符串
2. 在 FastAPI 中使用基础模型
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, EmailStr
from typing import Optional
app = FastAPI()
# 请求模型
class UserCreate(BaseModel):
username: str
email: EmailStr # 邮箱格式验证
password: str
age: Optional[int] = None
# 响应模型
class UserResponse(BaseModel):
id: int
username: str
email: str
age: Optional[int] = None
is_active: bool
# 内存存储(示例)
users_db = {}
user_id_counter = 1
@app.post("/users/", response_model=UserResponse)
async def create_user(user: UserCreate):
"""创建新用户"""
global user_id_counter
# 检查用户名是否已存在
for existing_user in users_db.values():
if existing_user["username"] == user.username:
raise HTTPException(status_code=400, detail="用户名已存在")
# 创建用户
user_id = user_id_counter
user_data = {
"id": user_id,
"username": user.username,
"email": user.email,
"age": user.age,
"is_active": True
}
users_db[user_id] = user_data
user_id_counter += 1
return user_data
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
"""获取用户信息"""
if user_id not in users_db:
raise HTTPException(status_code=404, detail="用户不存在")
return users_db[user_id]
@app.get("/users/", response_model=List[UserResponse])
async def list_users(skip: int = 0, limit: int = 10):
"""获取用户列表"""
users = list(users_db.values())
return users[skip:skip + limit]
🔧 字段类型与验证
1. 基本字段类型
from pydantic import BaseModel, Field, validator
from typing import Optional, List, Set, Dict, Union
from datetime import datetime, date, time
from decimal import Decimal
from uuid import UUID
import re
class Product(BaseModel):
# 基本类型
id: int
name: str
price: float
in_stock: bool = True
# 带约束的字符串
sku: str = Field(..., min_length=3, max_length=50, regex=r'^[A-Z0-9-]+$')
# 数值约束
quantity: int = Field(ge=0, le=1000) # 0 <= quantity <= 1000
discount: float = Field(ge=0.0, le=1.0) # 0.0 <= discount <= 1.0
# 可选字段
description: Optional[str] = Field(None, max_length=500)
# 列表和字典
tags: List[str] = Field(default_factory=list)
metadata: Dict[str, Union[str, int, float]] = Field(default_factory=dict)
# 日期时间
created_at: datetime = Field(default_factory=datetime.now)
expiry_date: Optional[date] = None
# 特殊类型
product_uuid: UUID = Field(default_factory=UUID)
precise_price: Decimal = Field(..., max_digits=10, decimal_places=2)
# 枚举类型
class Category(str, Enum):
ELECTRONICS = "electronics"
CLOTHING = "clothing"
BOOKS = "books"
FOOD = "food"
category: Category
# 配置
class Config:
json_schema_extra = {
"example": {
"id": 1,
"name": "Laptop",
"price": 999.99,
"sku": "LP-001",
"quantity": 50,
"category": "electronics",
"precise_price": "999.99"
}
}
2. Field 参数详解
from pydantic import BaseModel, Field
from typing import Optional
class Item(BaseModel):
# 必需字段
name: str = Field(
...,
title="商品名称",
description="商品的名称,3-100个字符",
min_length=3,
max_length=100,
example="智能手机"
)
# 可选字段带默认值
description: Optional[str] = Field(
None,
title="商品描述",
description="商品的详细描述,最多500字符",
max_length=500,
example="这是一款高性能智能手机"
)
# 数值字段
price: float = Field(
...,
gt=0,
title="价格",
description="商品价格,必须大于0",
example=2999.99
)
quantity: int = Field(
1,
ge=0,
le=1000,
title="数量",
description="库存数量,0-1000",
example=100
)
# 带正则验证
item_code: str = Field(
...,
regex=r'^[A-Z]{2}-\d{4}$',
title="商品编码",
description="商品编码格式:两个大写字母+横线+4位数字",
example="AB-1234"
)
# 别名
item_type: str = Field(
...,
alias="type",
title="商品类型",
description="商品的分类类型"
)
# 排除字段(不包含在输出中)
internal_id: int = Field(
...,
exclude=True,
title="内部ID",
description="内部使用的ID,不对外暴露"
)
3. 自定义验证器
from pydantic import BaseModel, validator, root_validator
from typing import Optional
import re
class UserRegistration(BaseModel):
username: str
email: str
password: str
confirm_password: str
age: Optional[int] = None
# 单个字段验证器
@validator('username')
def validate_username(cls, v):
if len(v) < 3:
raise ValueError('用户名至少3个字符')
if len(v) > 50:
raise ValueError('用户名最多50个字符')
if not re.match(r'^[a-zA-Z0-9_]+$', v):
raise ValueError('用户名只能包含字母、数字和下划线')
return v
@validator('email')
def validate_email(cls, v):
email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_regex, v):
raise ValueError('邮箱格式不正确')
return v.lower() # 转换为小写
@validator('password')
def validate_password(cls, v):
if len(v) < 8:
raise ValueError('密码至少8个字符')
if not any(c.isupper() for c in v):
raise ValueError('密码必须包含至少一个大写字母')
if not any(c.islower() for c in v):
raise ValueError('密码必须包含至少一个小写字母')
if not any(c.isdigit() for c in v):
raise ValueError('密码必须包含至少一个数字')
return v
# 多字段验证器
@validator('confirm_password')
def validate_passwords_match(cls, v, values):
if 'password' in values and v != values['password']:
raise ValueError('两次输入的密码不一致')
return v
@validator('age')
def validate_age(cls, v):
if v is not None:
if v < 0:
raise ValueError('年龄不能为负数')
if v > 150:
raise ValueError('年龄不能超过150岁')
return v
# 根验证器(所有字段验证完成后执行)
@root_validator(pre=True) # pre=True 在字段验证前执行
def validate_all_fields(cls, values):
# 检查必填字段
required_fields = ['username', 'email', 'password', 'confirm_password']
for field in required_fields:
if field not in values or values[field] is None:
raise ValueError(f'{field} 是必填字段')
return values
@root_validator # 默认在字段验证后执行
def post_validation(cls, values):
# 验证后处理
if 'age' in values and values['age'] is not None:
if values['age'] < 18:
values['is_adult'] = False
else:
values['is_adult'] = True
return values
🎛️ 高级验证器
1. 条件验证
from pydantic import BaseModel, validator, root_validator
from typing import Optional, Literal
from datetime import date
class Order(BaseModel):
order_type: Literal['standard', 'express', 'international']
shipping_address: Optional[str] = None
billing_address: Optional[str] = None
delivery_date: Optional[date] = None
express_fee: Optional[float] = None
international_tax: Optional[float] = None
# 条件验证:快递订单必须有快递费
@validator('express_fee')
def validate_express_fee(cls, v, values):
if values.get('order_type') == 'express' and v is None:
raise ValueError('快递订单必须指定快递费')
if v is not None and v <= 0:
raise ValueError('快递费必须大于0')
return v
# 条件验证:国际订单必须有国际税
@validator('international_tax')
def validate_international_tax(cls, v, values):
if values.get('order_type') == 'international' and v is None:
raise ValueError('国际订单必须指定国际税')
if v is not None and v <= 0:
raise ValueError('国际税必须大于0')
return v
# 条件验证:标准订单不能有快递费和国际税
@root_validator
def validate_standard_order(cls, values):
order_type = values.get('order_type')
express_fee = values.get('express_fee')
international_tax = values.get('international_tax')
if order_type == 'standard':
if express_fee is not None:
raise ValueError('标准订单不能有快递费')
if international_tax is not None:
raise ValueError('标准订单不能有国际税')
return values
# 验证送货地址
@validator('shipping_address')
def validate_shipping_address(cls, v, values):
if v is None and values.get('order_type') != 'digital':
raise ValueError('非数字商品订单必须提供送货地址')
return v
# 验证送货日期
@validator('delivery_date')
def validate_delivery_date(cls, v, values):
if v is not None:
if v < date.today():
raise ValueError('送货日期不能是过去日期')
# 快递订单必须在3天内送达
if values.get('order_type') == 'express':
from datetime import timedelta
max_date = date.today() + timedelta(days=3)
if v > max_date:
raise ValueError('快递订单必须在3天内送达')
return v
2. 自定义验证函数
from pydantic import BaseModel, validator
from typing import Optional
import phonenumbers
class ContactForm(BaseModel):
name: str
email: str
phone: Optional[str] = None
message: str
# 自定义手机号验证
@validator('phone')
def validate_phone_number(cls, v):
if v is None:
return v
try:
# 尝试解析手机号
parsed_number = phonenumbers.parse(v, "CN")
if not phonenumbers.is_valid_number(parsed_number):
raise ValueError('无效的手机号码')
# 格式化手机号
return phonenumbers.format_number(
parsed_number,
phonenumbers.PhoneNumberFormat.E164
)
except phonenumbers.NumberParseException:
raise ValueError('手机号码格式不正确')
# 自定义邮箱验证
@validator('email')
def validate_email_domain(cls, v):
# 检查邮箱域名
domain = v.split('@')[-1]
# 禁止的域名列表
blocked_domains = ['tempmail.com', 'throwaway.com']
if domain in blocked_domains:
raise ValueError('不允许使用临时邮箱')
return v
# 自定义消息长度验证
@validator('message')
def validate_message_length(cls, v):
words = v.split()
if len(words) < 10:
raise ValueError('消息至少需要10个单词')
if len(words) > 1000:
raise ValueError('消息最多1000个单词')
return v
🔗 嵌套模型与关系
1. 嵌套模型
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
from datetime import datetime
# 地址模型
class Address(BaseModel):
street: str
city: str
state: str
zip_code: str
country: str = "中国"
class Config:
json_schema_extra = {
"example": {
"street": "人民路123号",
"city": "北京",
"state": "北京",
"zip_code": "100000",
"country": "中国"
}
}
# 订单项模型
class OrderItem(BaseModel):
product_id: int
product_name: str
quantity: int = Field(ge=1, le=100)
unit_price: float = Field(gt=0)
@property
def total_price(self):
return self.quantity * self.unit_price
# 用户模型
class UserDetail(BaseModel):
id: int
username: str
email: str
full_name: Optional[str] = None
addresses: List[Address] = Field(default_factory=list)
# 计算属性
@property
def has_address(self):
return len(self.addresses) > 0
# 方法
def get_primary_address(self):
return self.addresses[0] if self.addresses else None
# 订单模型
class Order(BaseModel):
id: int
user_id: int
items: List[OrderItem]
shipping_address: Address
billing_address: Optional[Address] = None
order_date: datetime = Field(default_factory=datetime.now)
status: str = Field(default="pending", regex="^(pending|processing|shipped|delivered|cancelled)$")
# 计算总价
@property
def total_amount(self):
return sum(item.total_price for item in self.items)
# 计算商品总数
@property
def total_items(self):
return sum(item.quantity for item in self.items)
# 验证方法
def validate_order(self):
if self.total_amount <= 0:
raise ValueError("订单总金额必须大于0")
if not self.items:
raise ValueError("订单必须包含至少一个商品")
return True
# 使用示例
def create_sample_order():
# 创建地址
shipping_address = Address(
street="科技园路456号",
city="深圳",
state="广东",
zip_code="518000"
)
# 创建订单项
items = [
OrderItem(
product_id=1,
product_name="笔记本电脑",
quantity=1,
unit_price=7999.99
),
OrderItem(
product_id=2,
product_name="鼠标",
quantity=2,
unit_price=99.99
)
]
# 创建订单
order = Order(
id=1001,
user_id=123,
items=items,
shipping_address=shipping_address,
status="processing"
)
print(f"订单ID: {order.id}")
print(f"总金额: {order.total_amount}")
print(f"商品总数: {order.total_items}")
print(f"送货地址: {order.shipping_address.city}")
return order
2. 模型继承
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
# 基础模型
class BaseEntity(BaseModel):
id: int
created_at: datetime = Field(default_factory=datetime.now)
updated_at: Optional[datetime] = None
is_active: bool = True
def update_timestamp(self):
self.updated_at = datetime.now()
class Config:
from_attributes = True
# 用户基础模型
class UserBase(BaseEntity):
username: str
email: str
class Config:
json_schema_extra = {
"example": {
"id": 1,
"username": "john_doe",
"email": "john@example.com"
}
}
# 创建用户模型(用于创建操作)
class UserCreate(UserBase):
password: str
confirm_password: str
@validator('confirm_password')
def passwords_match(cls, v, values):
if 'password' in values and v != values['password']:
raise ValueError('密码不匹配')
return v
# 更新用户模型(用于更新操作)
class UserUpdate(BaseModel):
username: Optional[str] = None
email: Optional[str] = None
password: Optional[str] = None
class Config:
json_schema_extra = {
"example": {
"username": "new_username",
"email": "new_email@example.com"
}
}
# 用户响应模型(用于API响应)
class UserResponse(UserBase):
profile_image: Optional[str] = None
bio: Optional[str] = None
class Config:
json_schema_extra = {
"example": {
"id": 1,
"username": "john_doe",
"email": "john@example.com",
"created_at": "2024-01-01T00:00:00",
"is_active": True,
"profile_image": "https://example.com/image.jpg",
"bio": "软件工程师"
}
}
# 用户详情模型(包含敏感信息,仅内部使用)
class UserDetail(UserResponse):
hashed_password: str
last_login: Optional[datetime] = None
login_count: int = 0
class Config:
# 排除敏感字段从序列化输出
fields = {
'hashed_password': {'exclude': True}
}
# 使用示例
def user_workflow():
# 创建用户
create_data = {
"id": 1,
"username": "john_doe",
"email": "john@example.com",
"password": "secure_password",
"confirm_password": "secure_password"
}
user_create = UserCreate(**create_data)
print(f"创建用户: {user_create.username}")
# 更新用户
update_data = {
"username": "john_updated",
"email": "john.updated@example.com"
}
user_update = UserUpdate(**update_data)
print(f"更新用户: {user_update.username}")
# 响应数据
response_data = {
"id": 1,
"username": "john_doe",
"email": "john@example.com",
"created_at": datetime.now(),
"profile_image": "https://example.com/avatar.jpg",
"bio": "热爱编程的开发者",
"hashed_password": "hashed_value",
"login_count": 10
}
user_response = UserResponse(**response_data)
print(f"用户响应: {user_response.model_dump()}")
user_detail = UserDetail(**response_data)
print(f"用户详情(排除密码): {user_detail.model_dump()}")
return user_response
📤 响应模型
1. 基本响应模型
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from datetime import datetime
app = FastAPI()
# 基础数据模型
class ProductBase(BaseModel):
name: str
description: Optional[str] = None
price: float
category: str
# 创建请求模型
class ProductCreate(ProductBase):
stock: int = Field(ge=0, description="库存数量")
# 更新请求模型
class ProductUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
price: Optional[float] = None
category: Optional[str] = None
stock: Optional[int] = Field(None, ge=0)
# 响应模型
class ProductResponse(ProductBase):
id: int
stock: int
created_at: datetime
updated_at: Optional[datetime] = None
is_available: bool = True
@property
def formatted_price(self):
return f"¥{self.price:.2f}"
class Config:
from_attributes = True
json_schema_extra = {
"example": {
"id": 1,
"name": "智能手机",
"description": "高性能智能手机",
"price": 2999.99,
"category": "electronics",
"stock": 100,
"created_at": "2024-01-01T10:00:00",
"is_available": True
}
}
# 分页响应模型
class PaginatedResponse(BaseModel):
items: List[ProductResponse]
total: int
page: int
size: int
pages: int
@property
def has_next(self):
return self.page < self.pages
@property
def has_prev(self):
return self.page > 1
# 错误响应模型
class ErrorResponse(BaseModel):
detail: str
code: str
timestamp: datetime = Field(default_factory=datetime.now)
class Config:
json_schema_extra = {
"example": {
"detail": "商品不存在",
"code": "PRODUCT_NOT_FOUND",
"timestamp": "2024-01-01T10:00:00"
}
}
# 成功响应模型
class SuccessResponse(BaseModel):
success: bool = True
message: str
data: Optional[Dict[str, Any]] = None
timestamp: datetime = Field(default_factory=datetime.now)
# API 端点
@app.post("/products/",
response_model=ProductResponse,
status_code=status.HTTP_201_CREATED,
responses={
400: {"model": ErrorResponse, "description": "验证错误"},
409: {"model": ErrorResponse, "description": "商品已存在"}
})
async def create_product(product: ProductCreate):
"""创建新产品"""
# 这里应该是数据库操作
product_data = {
"id": 1,
**product.model_dump(),
"created_at": datetime.now(),
"updated_at": None
}
return ProductResponse(**product_data)
@app.get("/products/{product_id}",
response_model=ProductResponse,
responses={
404: {"model": ErrorResponse, "description": "商品不存在"}
})
async def get_product(product_id: int):
"""获取单个商品"""
# 模拟数据库查询
if product_id != 1:
raise HTTPException(
status_code=404,
detail=ErrorResponse(
detail=f"商品ID {product_id} 不存在",
code="PRODUCT_NOT_FOUND"
).model_dump()
)
product_data = {
"id": product_id,
"name": "示例商品",
"description": "这是一个示例商品",
"price": 99.99,
"category": "sample",
"stock": 50,
"created_at": datetime.now(),
"updated_at": None
}
return ProductResponse(**product_data)
@app.get("/products/",
response_model=PaginatedResponse,
responses={
400: {"model": ErrorResponse, "description": "参数错误"}
})
async def list_products(
page: int = 1,
size: int = 10,
category: Optional[str] = None,
min_price: Optional[float] = None,
max_price: Optional[float] = None
):
"""获取商品列表(分页)"""
# 模拟数据库查询和分页
total_items = 100
total_pages = (total_items + size - 1) // size
# 模拟数据
items = []
for i in range(size):
items.append(ProductResponse(
id=i + 1,
name=f"商品 {i + 1}",
description=f"商品 {i + 1} 的描述",
price=100.0 + i * 10,
category="electronics",
stock=50,
created_at=datetime.now(),
updated_at=None
))
return PaginatedResponse(
items=items,
total=total_items,
page=page,
size=size,
pages=total_pages
)
@app.put("/products/{product_id}",
response_model=ProductResponse,
responses={
404: {"model": ErrorResponse, "description": "商品不存在"},
400: {"model": ErrorResponse, "description": "验证错误"}
})
async def update_product(product_id: int, product_update: ProductUpdate):
"""更新商品信息"""
# 模拟更新操作
existing_product = {
"id": product_id,
"name": "原始商品",
"description": "原始描述",
"price": 100.0,
"category": "electronics",
"stock": 50,
"created_at": datetime.now(),
"updated_at": None
}
# 更新字段
update_data = product_update.model_dump(exclude_unset=True)
for key, value in update_data.items():
if value is not None:
existing_product[key] = value
existing_product["updated_at"] = datetime.now()
return ProductResponse(**existing_product)
@app.delete("/products/{product_id}",
response_model=SuccessResponse,
responses={
404: {"model": ErrorResponse, "description": "商品不存在"}
})
async def delete_product(product_id: int):
"""删除商品"""
# 模拟删除操作
return SuccessResponse(
message=f"商品 {product_id} 删除成功",
data={"product_id": product_id}
)
2. 响应模型配置
from fastapi import FastAPI
from pydantic import BaseModel, Field, ConfigDict
from typing import Optional, List
from datetime import datetime
app = FastAPI()
class Product(BaseModel):
# 模型配置(新方式)
model_config = ConfigDict(
from_attributes=True, # 允许从ORM对象转换
populate_by_name=True, # 允许使用字段名和别名
str_strip_whitespace=True, # 自动去除字符串空格
str_min_length=1, # 字符串最小长度
extra='ignore', # 忽略额外字段
json_schema_extra={
"example": {
"id": 1,
"name": "商品名称",
"price": 99.99
}
}
)
id: int
name: str = Field(min_length=1, max_length=100)
price: float = Field(gt=0)
description: Optional[str] = Field(None, max_length=500)
# 计算字段(不存储在模型中)
@property
def price_with_tax(self):
return self.price * 1.1
# 序列化配置
def model_dump(self, *args, **kwargs):
# 自定义序列化
data = super().model_dump(*args, **kwargs)
data['price_with_tax'] = self.price_with_tax
return data
# 响应模型排除字段
class ProductPublic(BaseModel):
id: int
name: str
price: float
description: Optional[str]
class Config:
from_attributes = True
class ProductPrivate(ProductPublic):
cost: float # 成本价,不对外公开
profit_margin: float
class Config:
# 排除敏感字段
fields = {
'cost': {'exclude': True},
'profit_margin': {'exclude': True}
}
# 动态响应模型
def get_product_response_model(include_sensitive: bool = False):
"""根据权限返回不同的响应模型"""
if include_sensitive:
class ProductResponse(ProductPrivate):
class Config:
from_attributes = True
return ProductResponse
else:
class ProductResponse(ProductPublic):
class Config:
from_attributes = True
return ProductResponse
@app.get("/products/{product_id}")
async def get_product(
product_id: int,
admin: bool = False # 模拟管理员权限
):
"""获取商品信息,管理员可以看到更多信息"""
# 模拟数据库查询
product_data = {
"id": product_id,
"name": "示例商品",
"price": 99.99,
"description": "商品描述",
"cost": 50.00, # 成本价
"profit_margin": 0.5 # 利润率
}
# 根据权限选择响应模型
ResponseModel = get_product_response_model(include_sensitive=admin)
return ResponseModel(**product_data)
# 嵌套响应模型
class Category(BaseModel):
id: int
name: str
description: Optional[str] = None
class ProductWithCategory(ProductPublic):
category: Category
related_products: List[ProductPublic] = []
@app.get("/products/{product_id}/detail")
async def get_product_detail(product_id: int):
"""获取商品详情(包含分类信息)"""
product_data = {
"id": product_id,
"name": "示例商品",
"price": 99.99,
"description": "商品描述",
"category": {
"id": 1,
"name": "电子产品",
"description": "电子设备分类"
},
"related_products": [
{
"id": 2,
"name": "相关商品1",
"price": 79.99,
"description": "相关商品描述1"
},
{
"id": 3,
"name": "相关商品2",
"price": 119.99,
"description": "相关商品描述2"
}
]
}
return ProductWithCategory(**product_data)
🏗️ 实际应用示例
1. 完整的用户管理系统
from fastapi import FastAPI, HTTPException, Depends, status, Query
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field, EmailStr, validator, ConfigDict
from typing import Optional, List, Dict, Any
from datetime import datetime, timedelta
from enum import Enum
import uuid
app = FastAPI(title="用户管理系统", version="1.0.0")
security = HTTPBearer()
# 枚举类型
class UserRole(str, Enum):
ADMIN = "admin"
USER = "user"
MODERATOR = "moderator"
class UserStatus(str, Enum):
ACTIVE = "active"
INACTIVE = "inactive"
SUSPENDED = "suspended"
BANNED = "banned"
# 基础模型
class UserBase(BaseModel):
username: str = Field(..., min_length=3, max_length=50, regex=r'^[a-zA-Z0-9_]+$')
email: EmailStr
full_name: Optional[str] = Field(None, max_length=100)
model_config = ConfigDict(
from_attributes=True,
str_strip_whitespace=True
)
# 创建用户模型
class UserCreate(UserBase):
password: str = Field(..., min_length=8)
confirm_password: str
@validator('password')
def validate_password_strength(cls, v):
if not any(c.isupper() for c in v):
raise ValueError('密码必须包含至少一个大写字母')
if not any(c.islower() for c in v):
raise ValueError('密码必须包含至少一个小写字母')
if not any(c.isdigit() for c in v):
raise ValueError('密码必须包含至少一个数字')
if not any(c in '!@#$%^&*()_+-=[]{}|;:,.<>?`~' for c in v):
raise ValueError('密码必须包含至少一个特殊字符')
return v
@validator('confirm_password')
def passwords_match(cls, v, values):
if 'password' in values and v != values['password']:
raise ValueError('两次输入的密码不一致')
return v
# 更新用户模型
class UserUpdate(BaseModel):
username: Optional[str] = Field(None, min_length=3, max_length=50)
email: Optional[EmailStr] = None
full_name: Optional[str] = Field(None, max_length=100)
bio: Optional[str] = Field(None, max_length=500)
avatar_url: Optional[str] = None
@validator('username')
def validate_username(cls, v):
if v and not re.match(r'^[a-zA-Z0-9_]+$', v):
raise ValueError('用户名只能包含字母、数字和下划线')
return v
# 用户响应模型
class UserResponse(UserBase):
id: int
role: UserRole = UserRole.USER
status: UserStatus = UserStatus.ACTIVE
bio: Optional[str] = None
avatar_url: Optional[str] = None
created_at: datetime
updated_at: Optional[datetime] = None
last_login: Optional[datetime] = None
@property
def is_admin(self):
return self.role == UserRole.ADMIN
@property
def is_active(self):
return self.status == UserStatus.ACTIVE
model_config = ConfigDict(
json_schema_extra={
"example": {
"id": 1,
"username": "john_doe",
"email": "john@example.com",
"full_name": "John Doe",
"role": "user",
"status": "active",
"bio": "软件工程师",
"avatar_url": "https://example.com/avatar.jpg",
"created_at": "2024-01-01T00:00:00",
"updated_at": "2024-01-02T00:00:00"
}
}
)
# 用户详情模型(管理员可见)
class UserDetail(UserResponse):
login_count: int = 0
failed_login_attempts: int = 0
account_locked_until: Optional[datetime] = None
model_config = ConfigDict(
fields={
'failed_login_attempts': {'exclude': True},
'account_locked_until': {'exclude': True}
}
)
# 分页响应
class PaginatedUsers(BaseModel):
items: List[UserResponse]
total: int
page: int
size: int
total_pages: int
has_next: bool
has_prev: bool
# 登录模型
class LoginRequest(BaseModel):
username: str
password: str
remember_me: bool = False
class LoginResponse(BaseModel):
access_token: str
token_type: str = "bearer"
expires_in: int
user: UserResponse
# 密码重置模型
class PasswordResetRequest(BaseModel):
email: EmailStr
class PasswordResetConfirm(BaseModel):
token: str
new_password: str
confirm_password: str
@validator('new_password')
def validate_password_strength(cls, v):
if len(v) < 8:
raise ValueError('密码至少8个字符')
if not any(c.isupper() for c in v):
raise ValueError('密码必须包含至少一个大写字母')
if not any(c.islower() for c in v):
raise ValueError('密码必须包含至少一个小写字母')
if not any(c.isdigit() for c in v):
raise ValueError('密码必须包含至少一个数字')
return v
@validator('confirm_password')
def passwords_match(cls, v, values):
if 'new_password' in values and v != values['new_password']:
raise ValueError('两次输入的密码不一致')
return v
# 模拟数据库
users_db = {}
user_id_counter = 1
# 依赖项
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> UserResponse:
"""获取当前用户"""
# 这里应该是JWT令牌验证
token = credentials.credentials
# 模拟用户查找
user_id = 1 # 从令牌中解析
if user_id not in users_db:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证令牌"
)
return UserResponse(**users_db[user_id])
async def get_current_admin(
current_user: UserResponse = Depends(get_current_user)
) -> UserResponse:
"""检查是否为管理员"""
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="需要管理员权限"
)
return current_user
# API端点
@app.post("/auth/register",
response_model=UserResponse,
status_code=status.HTTP_201_CREATED,
summary="用户注册",
description="创建新用户账户")
async def register(user: UserCreate):
"""用户注册"""
global user_id_counter
# 检查用户名是否已存在
for existing_user in users_db.values():
if existing_user["username"] == user.username:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="用户名已存在"
)
if existing_user["email"] == user.email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="邮箱已注册"
)
# 创建用户
user_id = user_id_counter
user_data = {
"id": user_id,
"username": user.username,
"email": user.email,
"full_name": user.full_name,
"role": UserRole.USER,
"status": UserStatus.ACTIVE,
"created_at": datetime.now(),
"updated_at": None,
"last_login": None,
"login_count": 0,
"failed_login_attempts": 0
}
users_db[user_id] = user_data
user_id_counter += 1
return UserResponse(**user_data)
@app.post("/auth/login",
response_model=LoginResponse,
summary="用户登录",
description="用户登录并获取访问令牌")
async def login(login_data: LoginRequest):
"""用户登录"""
# 模拟用户查找和密码验证
user = None
for user_data in users_db.values():
if (user_data["username"] == login_data.username or
user_data["email"] == login_data.username):
user = user_data
break
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误"
)
# 模拟密码验证(实际应该使用哈希验证)
# if not verify_password(login_data.password, user["hashed_password"]):
# raise HTTPException(...)
# 模拟生成令牌
access_token = str(uuid.uuid4())
expires_in = 3600 # 1小时
# 更新最后登录时间
user["last_login"] = datetime.now()
user["login_count"] += 1
return LoginResponse(
access_token=access_token,
expires_in=expires_in,
user=UserResponse(**user)
)
@app.get("/users/me",
response_model=UserResponse,
summary="获取当前用户信息",
description="获取当前登录用户的详细信息")
async def get_current_user_info(
current_user: UserResponse = Depends(get_current_user)
):
"""获取当前用户信息"""
return current_user
@app.put("/users/me",
response_model=UserResponse,
summary="更新当前用户信息",
description="更新当前登录用户的个人信息")
async def update_current_user_info(
user_update: UserUpdate,
current_user: UserResponse = Depends(get_current_user)
):
"""更新当前用户信息"""
user_id = current_user.id
# 检查用户名是否已被其他用户使用
if user_update.username:
for uid, user_data in users_db.items():
if (uid != user_id and
user_data["username"] == user_update.username):
raise HTTPException(
status_code
❤️❤️❤️本人水平有限,如有纰漏,欢迎各位大佬评论批评指正!😄😄😄
💘💘💘如果觉得这篇文对你有帮助的话,也请给个点赞、收藏下吧,非常感谢!👍 👍 👍
🔥🔥🔥Stay Hungry Stay Foolish 道阻且长,行则将至,让我们一起加油吧!🌙🌙🌙
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)