CQRS模式实践
目录
『宝藏代码胶囊开张啦!』—— 我的 CodeCapsule 来咯!✨写代码不再头疼!我的新站点 CodeCapsule 主打一个 “白菜价”+“量身定制”!无论是卡脖子的毕设/课设/文献复现,需要灵光一现的算法改进,还是想给项目加个“外挂”,这里都有便宜又好用的代码方案等你发现!低成本,高适配,助你轻松通关!速来围观 👉 CodeCapsule官网
CQRS模式实践:从理论到Python实现
引言
在传统的软件架构中,我们通常使用单一的实体模型来同时处理数据的增删改(命令)和查询。这种设计简单直观,但随着业务复杂度提升,问题逐渐显现:复杂的业务逻辑和查询需求混杂在一起,导致模型难以维护;读写负载不均衡时,单一数据库难以同时优化两种操作;安全性和扩展性也受到限制。
CQRS(Command Query Responsibility Segregation,命令查询职责分离) 模式应运而生。它通过将系统的读写操作分离为独立的模型,从根本上解决了上述问题。本文将从理论出发,深入探讨 CQRS 的核心思想、架构模式、优势与挑战,并通过 Python 代码实战,展示如何构建一个基于 CQRS 的订单系统。
1. CQRS 核心概念
CQRS 是由 Greg Young 提出的架构模式,其核心理念非常简单:将修改数据的行为(命令)与读取数据的行为(查询)分离。
1.1 命令(Command)
- 命令代表改变系统状态的意图,例如“创建订单”、“支付订单”。
- 命令通常以动词开头,包含执行操作所需的数据。
- 命令执行后不返回数据,只返回执行结果(成功/失败)。
- 命令侧重于业务规则和一致性。
1.2 查询(Query)
- 查询代表获取数据的请求,例如“查询订单详情”、“搜索商品”。
- 查询只读取数据,不修改系统状态。
- 查询可以返回复杂的数据结构,针对 UI 需求进行优化。
- 查询侧重于性能和灵活性。
1.3 命令模型与查询模型
在 CQRS 中,命令和查询使用完全不同的数据模型:
- 命令模型:针对写操作优化,通常遵循领域驱动设计(DDD),包含业务逻辑和规则。
- 查询模型:针对读操作优化,通常是简单的数据视图,可能由多个表连接而成,甚至使用非关系型数据库。
2. CQRS 架构演变
CQRS 可以独立使用,也可以与事件溯源(Event Sourcing)结合,形成强大的架构组合。
2.1 简单 CQRS
最简单的 CQRS 实现是共享同一个数据库,但使用不同的模型(例如 ORM 实体和查询专用的 DTO)。这种“逻辑分离”虽然简单,但无法解决读写数据库的物理分离。
2.2 物理分离的 CQRS
将读写数据库物理分离:命令操作写库(如关系型数据库),查询操作读库(如 Elasticsearch、Redis 或物化视图)。两者之间通过数据同步机制保持一致,例如事件驱动、定时 ETL 等。
2.3 事件溯源 + CQRS
事件溯源将状态变更存储为事件序列,而 CQRS 的写模型负责生成事件,读模型则订阅事件并构建物化视图。这是最经典且强大的组合。
3. CQRS 的优势与挑战
3.1 优势
- 职责分离:命令模型专注于业务规则,查询模型专注于数据展示,代码更清晰。
- 独立扩展:读操作和写操作可按需独立扩展,例如增加只读副本应对高并发查询。
- 优化灵活性:可为读模型选择最适合的存储技术(如 Elasticsearch 做全文搜索),为写模型保留强一致性的关系型数据库。
- 安全性增强:可对读写接口实施不同的安全策略。
3.2 挑战
- 一致性延迟:读写模型同步存在延迟,系统呈现最终一致性。
- 复杂性增加:需要维护同步机制,可能引入事件驱动架构。
- 学习曲线:团队成员需要理解 CQRS 和可能的事件溯源概念。
4. Python 实战:构建 CQRS 订单系统
本部分我们将实现一个简化的 CQRS 订单系统,包含以下功能:
- 命令:创建订单、支付订单。
- 查询:按订单 ID 查询订单详情、按用户 ID 查询订单列表。
我们采用物理分离的 CQRS:
- 写存储:使用 SQLite 数据库
orders_write.db,包含符合第三范式的表。 - 读存储:使用另一个 SQLite 数据库
orders_read.db,包含针对查询优化的表(非规范化)。 - 同步机制:通过一个简单的内存事件总线,命令执行后发布事件,事件处理器更新读库。
4.1 环境准备
无需额外安装,Python 3.6+ 自带 sqlite3 模块。
4.2 完整代码实现
"""
CQRS 模式实战:订单系统
- 写模型:orders_write.db,遵循范式,包含 orders, payments 表
- 读模型:orders_read.db,非规范化,包含 order_details 表
- 同步:基于内存事件总线,命令执行后发布事件,更新读库
"""
import sqlite3
import json
import uuid
import time
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum
# ==================== 领域模型 ====================
class OrderStatus(Enum):
PENDING = 'pending'
PAID = 'paid'
CANCELLED = 'cancelled'
@dataclass
class Order:
"""订单聚合根"""
order_id: str
user_id: str
total_amount: float
status: OrderStatus
created_at: float
items: List[Dict] # 商品列表 [{"product_id": "p1", "price": 100, "quantity": 2}]
@dataclass
class Payment:
"""支付实体"""
payment_id: str
order_id: str
amount: float
paid_at: float
# ==================== 事件定义 ====================
class Event:
"""基础事件"""
def __init__(self, event_type: str, data: dict):
self.event_id = str(uuid.uuid4())
self.event_type = event_type
self.timestamp = time.time()
self.data = data
class OrderCreatedEvent(Event):
def __init__(self, order: Order):
super().__init__('order.created', asdict(order))
class OrderPaidEvent(Event):
def __init__(self, order_id: str, payment: Payment):
super().__init__('order.paid', {
'order_id': order_id,
'payment': asdict(payment)
})
# ==================== 事件总线 ====================
class EventBus:
"""简单内存事件总线"""
def __init__(self):
self.handlers = {}
def register(self, event_type: str, handler):
"""注册事件处理器"""
if event_type not in self.handlers:
self.handlers[event_type] = []
self.handlers[event_type].append(handler)
def publish(self, event: Event):
"""发布事件"""
handlers = self.handlers.get(event.event_type, [])
for handler in handlers:
handler(event)
# ==================== 写模型仓储 ====================
class WriteModelRepository:
"""操作写数据库"""
def __init__(self, db_path: str = 'orders_write.db'):
self.db_path = db_path
self._init_db()
def _init_db(self):
"""初始化写数据库表结构"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# 订单表
cursor.execute('''
CREATE TABLE IF NOT EXISTS orders (
order_id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
total_amount REAL NOT NULL,
status TEXT NOT NULL,
created_at REAL NOT NULL,
items TEXT NOT NULL -- 存储JSON
)
''')
# 支付表
cursor.execute('''
CREATE TABLE IF NOT EXISTS payments (
payment_id TEXT PRIMARY KEY,
order_id TEXT NOT NULL,
amount REAL NOT NULL,
paid_at REAL NOT NULL,
FOREIGN KEY(order_id) REFERENCES orders(order_id)
)
''')
conn.commit()
def save_order(self, order: Order):
"""保存订单"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO orders
(order_id, user_id, total_amount, status, created_at, items)
VALUES (?, ?, ?, ?, ?, ?)
''', (
order.order_id,
order.user_id,
order.total_amount,
order.status.value,
order.created_at,
json.dumps(order.items)
))
conn.commit()
def save_payment(self, payment: Payment):
"""保存支付"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO payments (payment_id, order_id, amount, paid_at)
VALUES (?, ?, ?, ?)
''', (payment.payment_id, payment.order_id, payment.amount, payment.paid_at))
conn.commit()
# 更新订单状态为已支付
cursor.execute('''
UPDATE orders SET status = ? WHERE order_id = ?
''', (OrderStatus.PAID.value, payment.order_id))
conn.commit()
# ==================== 读模型仓储 ====================
class ReadModelRepository:
"""操作读数据库"""
def __init__(self, db_path: str = 'orders_read.db'):
self.db_path = db_path
self._init_db()
def _init_db(self):
"""初始化读数据库表(反范式设计)"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS order_details (
order_id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
total_amount REAL NOT NULL,
status TEXT NOT NULL,
created_at REAL NOT NULL,
paid_at REAL, -- 可能为空
payment_id TEXT, -- 可能为空
items TEXT NOT NULL, -- 商品列表JSON
_version INTEGER DEFAULT 0 -- 乐观锁/版本控制
)
''')
conn.commit()
def upsert_order_detail(self, order_detail: Dict):
"""插入或更新订单详情"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO order_details
(order_id, user_id, total_amount, status, created_at, paid_at, payment_id, items, _version)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
order_detail['order_id'],
order_detail['user_id'],
order_detail['total_amount'],
order_detail['status'],
order_detail['created_at'],
order_detail.get('paid_at'),
order_detail.get('payment_id'),
json.dumps(order_detail['items']),
order_detail.get('_version', 0) + 1
))
conn.commit()
def find_order_by_id(self, order_id: str) -> Optional[Dict]:
"""根据订单ID查询详情"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('SELECT * FROM order_details WHERE order_id = ?', (order_id,))
row = cursor.fetchone()
if row:
result = dict(row)
result['items'] = json.loads(result['items'])
return result
return None
def find_orders_by_user(self, user_id: str) -> List[Dict]:
"""根据用户ID查询订单列表"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('SELECT * FROM order_details WHERE user_id = ? ORDER BY created_at DESC', (user_id,))
rows = cursor.fetchall()
results = []
for row in rows:
result = dict(row)
result['items'] = json.loads(result['items'])
results.append(result)
return results
# ==================== 命令处理器 ====================
class OrderCommandHandler:
"""处理订单相关命令"""
def __init__(self, write_repo: WriteModelRepository, event_bus: EventBus):
self.write_repo = write_repo
self.event_bus = event_bus
def handle_create_order(self, user_id: str, items: List[Dict]) -> str:
"""处理创建订单命令"""
# 计算总金额
total = sum(item['price'] * item['quantity'] for item in items)
order = Order(
order_id=str(uuid.uuid4()),
user_id=user_id,
total_amount=total,
status=OrderStatus.PENDING,
created_at=time.time(),
items=items
)
# 保存到写库
self.write_repo.save_order(order)
# 发布事件
self.event_bus.publish(OrderCreatedEvent(order))
return order.order_id
def handle_pay_order(self, order_id: str, amount: float) -> bool:
"""处理支付订单命令"""
# 实际业务中应验证订单状态等,简化版本直接创建支付
payment = Payment(
payment_id=str(uuid.uuid4()),
order_id=order_id,
amount=amount,
paid_at=time.time()
)
# 保存支付到写库(也会更新订单状态)
self.write_repo.save_payment(payment)
# 发布事件
self.event_bus.publish(OrderPaidEvent(order_id, payment))
return True
# ==================== 查询处理器 ====================
class OrderQueryHandler:
"""处理订单查询"""
def __init__(self, read_repo: ReadModelRepository):
self.read_repo = read_repo
def handle_get_order(self, order_id: str) -> Optional[Dict]:
"""查询单个订单"""
return self.read_repo.find_order_by_id(order_id)
def handle_list_user_orders(self, user_id: str) -> List[Dict]:
"""查询用户订单列表"""
return self.read_repo.find_orders_by_user(user_id)
# ==================== 事件处理器(同步读模型)====================
class ReadModelUpdater:
"""订阅事件并更新读模型"""
def __init__(self, read_repo: ReadModelRepository):
self.read_repo = read_repo
def handle_order_created(self, event: OrderCreatedEvent):
"""处理订单创建事件,更新读库"""
order_data = event.data
order_detail = {
'order_id': order_data['order_id'],
'user_id': order_data['user_id'],
'total_amount': order_data['total_amount'],
'status': order_data['status'],
'created_at': order_data['created_at'],
'paid_at': None,
'payment_id': None,
'items': order_data['items'],
'_version': 0
}
self.read_repo.upsert_order_detail(order_detail)
print(f"[ReadModel] 订单 {order_data['order_id']} 已同步")
def handle_order_paid(self, event: OrderPaidEvent):
"""处理订单支付事件,更新读库"""
order_id = event.data['order_id']
payment = event.data['payment']
# 先获取现有订单详情
existing = self.read_repo.find_order_by_id(order_id)
if existing:
existing['status'] = OrderStatus.PAID.value
existing['paid_at'] = payment['paid_at']
existing['payment_id'] = payment['payment_id']
existing['_version'] = existing.get('_version', 0) + 1
self.read_repo.upsert_order_detail(existing)
print(f"[ReadModel] 订单 {order_id} 支付状态已同步")
else:
# 理论上不应发生,但为健壮性,可从写库重建(略)
print(f"[ReadModel] 警告:订单 {order_id} 不存在于读库,无法更新支付")
# ==================== 主程序示例 ====================
def main():
"""演示CQRS流程"""
print("=" * 60)
print("CQRS 模式演示 - 订单系统")
print("=" * 60)
# 初始化基础设施
write_repo = WriteModelRepository()
read_repo = ReadModelRepository()
event_bus = EventBus()
# 初始化事件处理器
updater = ReadModelUpdater(read_repo)
event_bus.register('order.created', updater.handle_order_created)
event_bus.register('order.paid', updater.handle_order_paid)
# 初始化命令处理器和查询处理器
cmd_handler = OrderCommandHandler(write_repo, event_bus)
query_handler = OrderQueryHandler(read_repo)
# 1. 创建订单
print("\n--- 执行命令:创建订单 ---")
items = [
{"product_id": "p1001", "name": "笔记本电脑", "price": 5999.00, "quantity": 1},
{"product_id": "p1002", "name": "无线鼠标", "price": 99.00, "quantity": 2}
]
order_id = cmd_handler.handle_create_order("user_123", items)
print(f"订单创建成功: {order_id}")
# 稍等片刻,让事件异步处理(实际是同步的,因为事件总线是同步的,但为演示效果)
time.sleep(0.5)
# 2. 查询订单(立即查询,应能看到刚创建的订单)
print("\n--- 执行查询:查询订单详情 ---")
order = query_handler.handle_get_order(order_id)
if order:
print(f"订单详情: ID={order['order_id']}, 用户={order['user_id']}, "
f"金额={order['total_amount']}, 状态={order['status']}")
print(f"商品: {order['items']}")
# 3. 支付订单
print("\n--- 执行命令:支付订单 ---")
cmd_handler.handle_pay_order(order_id, 5999.00 + 99.00*2)
time.sleep(0.5)
# 4. 再次查询订单,状态应为已支付
print("\n--- 执行查询:查询更新后的订单 ---")
order = query_handler.handle_get_order(order_id)
if order:
print(f"订单详情: ID={order['order_id']}, 状态={order['status']}, "
f"支付时间={order['paid_at']}")
# 5. 查询用户的所有订单
print("\n--- 执行查询:用户订单列表 ---")
orders = query_handler.handle_list_user_orders("user_123")
print(f"用户 user_123 共有 {len(orders)} 个订单:")
for o in orders:
print(f" - {o['order_id']} 状态:{o['status']} 金额:{o['total_amount']}")
print("\n" + "=" * 60)
print("演示完成")
if __name__ == "__main__":
main()
4.3 代码说明
- 领域模型与事件:定义了
Order和Payment数据类,以及OrderCreatedEvent和OrderPaidEvent事件。 - 写模型仓储:
WriteModelRepository操作orders_write.db,包含 orders 和 payments 表,符合范式。save_order和save_payment负责持久化。 - 读模型仓储:
ReadModelRepository操作orders_read.db,使用反范式的order_details表,包含所有查询需要的字段。upsert_order_detail更新读库。 - 事件总线:简单的内存事件总线
EventBus,支持注册和发布事件。 - 命令处理器:
OrderCommandHandler处理create_order和pay_order命令。命令执行后调用写仓储,并发布相应事件。 - 查询处理器:
OrderQueryHandler处理查询,直接读取读库。 - 读模型更新器:
ReadModelUpdater订阅事件,根据事件更新读库中的物化视图,保持最终一致性。 - 主程序:演示了创建订单、支付、查询的完整流程,验证了读写分离和同步机制。
5. 代码自查与注意事项
- 数据库初始化:每次运行都会创建表,但不会清空现有数据,多次运行可能导致数据重复。实际生产环境应有迁移管理。
- 事件总线:此处为同步内存实现,实际系统应使用消息队列(如 RabbitMQ)实现可靠异步通信。
- 错误处理:未处理并发冲突(如版本号),真实环境应在更新读库时使用乐观锁。
- 幂等性:事件处理器应设计为幂等,避免重复事件导致数据错误。
- 最终一致性:命令执行后立即查询可能读不到最新数据(因为事件还未处理完),本示例中事件是同步的,所以能读到。但在真实异步场景,需要接受短暂延迟。
6. CQRS 模式总结
CQRS 是一种强大的架构模式,特别适用于以下场景:
- 复杂的业务领域,读写逻辑差异大。
- 高并发系统,需要独立扩展读写能力。
- 需要为不同客户端(Web、移动、报表)提供不同数据视图。
通过将读写模型分离,CQRS 提升了系统的可维护性、可扩展性和性能。但也要注意其引入的复杂性,只有在真正需要时才应使用。
最佳实践
- 明确边界:只在核心业务模块使用 CQRS,不要过度设计。
- 选择同步机制:根据一致性要求,选择事件驱动、CDC(变更数据捕获)或定时批处理。
- 监控数据延迟:对最终一致性系统,监控同步延迟,设置告警阈值。
- 结合事件溯源:若需要完整审计日志或历史状态重建,可引入事件溯源。
CQRS 并非银弹,但合理运用能显著改善系统架构。希望通过本文的理论与实战,你能在自己的项目中灵活应用这一模式。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)