『宝藏代码胶囊开张啦!』—— 我的 CodeCapsule 来咯!✨写代码不再头疼!我的新站点 CodeCapsule 主打一个 “白菜价”+“量身定制”!无论是卡脖子的毕设/课设/文献复现,需要灵光一现的算法改进,还是想给项目加个“外挂”,这里都有便宜又好用的代码方案等你发现!低成本,高适配,助你轻松通关!速来围观 👉 CodeCapsule官网

CQRS模式实践:从理论到Python实现

引言

在传统的软件架构中,我们通常使用单一的实体模型来同时处理数据的增删改(命令)和查询。这种设计简单直观,但随着业务复杂度提升,问题逐渐显现:复杂的业务逻辑和查询需求混杂在一起,导致模型难以维护;读写负载不均衡时,单一数据库难以同时优化两种操作;安全性和扩展性也受到限制。

CQRS(Command Query Responsibility Segregation,命令查询职责分离) 模式应运而生。它通过将系统的读写操作分离为独立的模型,从根本上解决了上述问题。本文将从理论出发,深入探讨 CQRS 的核心思想、架构模式、优势与挑战,并通过 Python 代码实战,展示如何构建一个基于 CQRS 的订单系统。

1. CQRS 核心概念

CQRS 是由 Greg Young 提出的架构模式,其核心理念非常简单:将修改数据的行为(命令)与读取数据的行为(查询)分离

1.1 命令(Command)

  • 命令代表改变系统状态的意图,例如“创建订单”、“支付订单”。
  • 命令通常以动词开头,包含执行操作所需的数据。
  • 命令执行后不返回数据,只返回执行结果(成功/失败)。
  • 命令侧重于业务规则和一致性。

1.2 查询(Query)

  • 查询代表获取数据的请求,例如“查询订单详情”、“搜索商品”。
  • 查询只读取数据,不修改系统状态。
  • 查询可以返回复杂的数据结构,针对 UI 需求进行优化。
  • 查询侧重于性能和灵活性。

1.3 命令模型与查询模型

在 CQRS 中,命令和查询使用完全不同的数据模型:

  • 命令模型:针对写操作优化,通常遵循领域驱动设计(DDD),包含业务逻辑和规则。
  • 查询模型:针对读操作优化,通常是简单的数据视图,可能由多个表连接而成,甚至使用非关系型数据库。

CQRS 系统

客户端

读模型

写模型

命令

查询

更新

同步机制

读取

用户/系统

命令处理器
Command Handler

查询处理器
Query Handler

写数据库
Orders Write

读数据库
Orders Read

2. CQRS 架构演变

CQRS 可以独立使用,也可以与事件溯源(Event Sourcing)结合,形成强大的架构组合。

2.1 简单 CQRS

最简单的 CQRS 实现是共享同一个数据库,但使用不同的模型(例如 ORM 实体和查询专用的 DTO)。这种“逻辑分离”虽然简单,但无法解决读写数据库的物理分离。

2.2 物理分离的 CQRS

将读写数据库物理分离:命令操作写库(如关系型数据库),查询操作读库(如 Elasticsearch、Redis 或物化视图)。两者之间通过数据同步机制保持一致,例如事件驱动、定时 ETL 等。

2.3 事件溯源 + CQRS

事件溯源将状态变更存储为事件序列,而 CQRS 的写模型负责生成事件,读模型则订阅事件并构建物化视图。这是最经典且强大的组合。

查询端

命令端

命令

加载聚合

生成新事件

事件发布

更新

查询

读取

用户

命令处理器

事件存储

事件处理器

读数据库

用户

查询处理器

3. CQRS 的优势与挑战

3.1 优势

  • 职责分离:命令模型专注于业务规则,查询模型专注于数据展示,代码更清晰。
  • 独立扩展:读操作和写操作可按需独立扩展,例如增加只读副本应对高并发查询。
  • 优化灵活性:可为读模型选择最适合的存储技术(如 Elasticsearch 做全文搜索),为写模型保留强一致性的关系型数据库。
  • 安全性增强:可对读写接口实施不同的安全策略。

3.2 挑战

  • 一致性延迟:读写模型同步存在延迟,系统呈现最终一致性
  • 复杂性增加:需要维护同步机制,可能引入事件驱动架构。
  • 学习曲线:团队成员需要理解 CQRS 和可能的事件溯源概念。

4. Python 实战:构建 CQRS 订单系统

本部分我们将实现一个简化的 CQRS 订单系统,包含以下功能:

  • 命令:创建订单、支付订单。
  • 查询:按订单 ID 查询订单详情、按用户 ID 查询订单列表。

我们采用物理分离的 CQRS:

  • 写存储:使用 SQLite 数据库 orders_write.db,包含符合第三范式的表。
  • 读存储:使用另一个 SQLite 数据库 orders_read.db,包含针对查询优化的表(非规范化)。
  • 同步机制:通过一个简单的内存事件总线,命令执行后发布事件,事件处理器更新读库。

4.1 环境准备

无需额外安装,Python 3.6+ 自带 sqlite3 模块。

4.2 完整代码实现

"""
CQRS 模式实战:订单系统
- 写模型:orders_write.db,遵循范式,包含 orders, payments 表
- 读模型:orders_read.db,非规范化,包含 order_details 表
- 同步:基于内存事件总线,命令执行后发布事件,更新读库
"""

import sqlite3
import json
import uuid
import time
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum


# ==================== 领域模型 ====================
class OrderStatus(Enum):
    PENDING = 'pending'
    PAID = 'paid'
    CANCELLED = 'cancelled'


@dataclass
class Order:
    """订单聚合根"""
    order_id: str
    user_id: str
    total_amount: float
    status: OrderStatus
    created_at: float
    items: List[Dict]  # 商品列表 [{"product_id": "p1", "price": 100, "quantity": 2}]


@dataclass
class Payment:
    """支付实体"""
    payment_id: str
    order_id: str
    amount: float
    paid_at: float


# ==================== 事件定义 ====================
class Event:
    """基础事件"""
    def __init__(self, event_type: str, data: dict):
        self.event_id = str(uuid.uuid4())
        self.event_type = event_type
        self.timestamp = time.time()
        self.data = data


class OrderCreatedEvent(Event):
    def __init__(self, order: Order):
        super().__init__('order.created', asdict(order))


class OrderPaidEvent(Event):
    def __init__(self, order_id: str, payment: Payment):
        super().__init__('order.paid', {
            'order_id': order_id,
            'payment': asdict(payment)
        })


# ==================== 事件总线 ====================
class EventBus:
    """简单内存事件总线"""
    def __init__(self):
        self.handlers = {}
    
    def register(self, event_type: str, handler):
        """注册事件处理器"""
        if event_type not in self.handlers:
            self.handlers[event_type] = []
        self.handlers[event_type].append(handler)
    
    def publish(self, event: Event):
        """发布事件"""
        handlers = self.handlers.get(event.event_type, [])
        for handler in handlers:
            handler(event)


# ==================== 写模型仓储 ====================
class WriteModelRepository:
    """操作写数据库"""
    
    def __init__(self, db_path: str = 'orders_write.db'):
        self.db_path = db_path
        self._init_db()
    
    def _init_db(self):
        """初始化写数据库表结构"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            # 订单表
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS orders (
                    order_id TEXT PRIMARY KEY,
                    user_id TEXT NOT NULL,
                    total_amount REAL NOT NULL,
                    status TEXT NOT NULL,
                    created_at REAL NOT NULL,
                    items TEXT NOT NULL  -- 存储JSON
                )
            ''')
            # 支付表
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS payments (
                    payment_id TEXT PRIMARY KEY,
                    order_id TEXT NOT NULL,
                    amount REAL NOT NULL,
                    paid_at REAL NOT NULL,
                    FOREIGN KEY(order_id) REFERENCES orders(order_id)
                )
            ''')
            conn.commit()
    
    def save_order(self, order: Order):
        """保存订单"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute('''
                INSERT OR REPLACE INTO orders 
                (order_id, user_id, total_amount, status, created_at, items)
                VALUES (?, ?, ?, ?, ?, ?)
            ''', (
                order.order_id,
                order.user_id,
                order.total_amount,
                order.status.value,
                order.created_at,
                json.dumps(order.items)
            ))
            conn.commit()
    
    def save_payment(self, payment: Payment):
        """保存支付"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute('''
                INSERT INTO payments (payment_id, order_id, amount, paid_at)
                VALUES (?, ?, ?, ?)
            ''', (payment.payment_id, payment.order_id, payment.amount, payment.paid_at))
            conn.commit()
            # 更新订单状态为已支付
            cursor.execute('''
                UPDATE orders SET status = ? WHERE order_id = ?
            ''', (OrderStatus.PAID.value, payment.order_id))
            conn.commit()


# ==================== 读模型仓储 ====================
class ReadModelRepository:
    """操作读数据库"""
    
    def __init__(self, db_path: str = 'orders_read.db'):
        self.db_path = db_path
        self._init_db()
    
    def _init_db(self):
        """初始化读数据库表(反范式设计)"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS order_details (
                    order_id TEXT PRIMARY KEY,
                    user_id TEXT NOT NULL,
                    total_amount REAL NOT NULL,
                    status TEXT NOT NULL,
                    created_at REAL NOT NULL,
                    paid_at REAL,                -- 可能为空
                    payment_id TEXT,              -- 可能为空
                    items TEXT NOT NULL,           -- 商品列表JSON
                    _version INTEGER DEFAULT 0     -- 乐观锁/版本控制
                )
            ''')
            conn.commit()
    
    def upsert_order_detail(self, order_detail: Dict):
        """插入或更新订单详情"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute('''
                INSERT OR REPLACE INTO order_details
                (order_id, user_id, total_amount, status, created_at, paid_at, payment_id, items, _version)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                order_detail['order_id'],
                order_detail['user_id'],
                order_detail['total_amount'],
                order_detail['status'],
                order_detail['created_at'],
                order_detail.get('paid_at'),
                order_detail.get('payment_id'),
                json.dumps(order_detail['items']),
                order_detail.get('_version', 0) + 1
            ))
            conn.commit()
    
    def find_order_by_id(self, order_id: str) -> Optional[Dict]:
        """根据订单ID查询详情"""
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.cursor()
            cursor.execute('SELECT * FROM order_details WHERE order_id = ?', (order_id,))
            row = cursor.fetchone()
            if row:
                result = dict(row)
                result['items'] = json.loads(result['items'])
                return result
            return None
    
    def find_orders_by_user(self, user_id: str) -> List[Dict]:
        """根据用户ID查询订单列表"""
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.cursor()
            cursor.execute('SELECT * FROM order_details WHERE user_id = ? ORDER BY created_at DESC', (user_id,))
            rows = cursor.fetchall()
            results = []
            for row in rows:
                result = dict(row)
                result['items'] = json.loads(result['items'])
                results.append(result)
            return results


# ==================== 命令处理器 ====================
class OrderCommandHandler:
    """处理订单相关命令"""
    
    def __init__(self, write_repo: WriteModelRepository, event_bus: EventBus):
        self.write_repo = write_repo
        self.event_bus = event_bus
    
    def handle_create_order(self, user_id: str, items: List[Dict]) -> str:
        """处理创建订单命令"""
        # 计算总金额
        total = sum(item['price'] * item['quantity'] for item in items)
        order = Order(
            order_id=str(uuid.uuid4()),
            user_id=user_id,
            total_amount=total,
            status=OrderStatus.PENDING,
            created_at=time.time(),
            items=items
        )
        # 保存到写库
        self.write_repo.save_order(order)
        # 发布事件
        self.event_bus.publish(OrderCreatedEvent(order))
        return order.order_id
    
    def handle_pay_order(self, order_id: str, amount: float) -> bool:
        """处理支付订单命令"""
        # 实际业务中应验证订单状态等,简化版本直接创建支付
        payment = Payment(
            payment_id=str(uuid.uuid4()),
            order_id=order_id,
            amount=amount,
            paid_at=time.time()
        )
        # 保存支付到写库(也会更新订单状态)
        self.write_repo.save_payment(payment)
        # 发布事件
        self.event_bus.publish(OrderPaidEvent(order_id, payment))
        return True


# ==================== 查询处理器 ====================
class OrderQueryHandler:
    """处理订单查询"""
    
    def __init__(self, read_repo: ReadModelRepository):
        self.read_repo = read_repo
    
    def handle_get_order(self, order_id: str) -> Optional[Dict]:
        """查询单个订单"""
        return self.read_repo.find_order_by_id(order_id)
    
    def handle_list_user_orders(self, user_id: str) -> List[Dict]:
        """查询用户订单列表"""
        return self.read_repo.find_orders_by_user(user_id)


# ==================== 事件处理器(同步读模型)====================
class ReadModelUpdater:
    """订阅事件并更新读模型"""
    
    def __init__(self, read_repo: ReadModelRepository):
        self.read_repo = read_repo
    
    def handle_order_created(self, event: OrderCreatedEvent):
        """处理订单创建事件,更新读库"""
        order_data = event.data
        order_detail = {
            'order_id': order_data['order_id'],
            'user_id': order_data['user_id'],
            'total_amount': order_data['total_amount'],
            'status': order_data['status'],
            'created_at': order_data['created_at'],
            'paid_at': None,
            'payment_id': None,
            'items': order_data['items'],
            '_version': 0
        }
        self.read_repo.upsert_order_detail(order_detail)
        print(f"[ReadModel] 订单 {order_data['order_id']} 已同步")
    
    def handle_order_paid(self, event: OrderPaidEvent):
        """处理订单支付事件,更新读库"""
        order_id = event.data['order_id']
        payment = event.data['payment']
        # 先获取现有订单详情
        existing = self.read_repo.find_order_by_id(order_id)
        if existing:
            existing['status'] = OrderStatus.PAID.value
            existing['paid_at'] = payment['paid_at']
            existing['payment_id'] = payment['payment_id']
            existing['_version'] = existing.get('_version', 0) + 1
            self.read_repo.upsert_order_detail(existing)
            print(f"[ReadModel] 订单 {order_id} 支付状态已同步")
        else:
            # 理论上不应发生,但为健壮性,可从写库重建(略)
            print(f"[ReadModel] 警告:订单 {order_id} 不存在于读库,无法更新支付")


# ==================== 主程序示例 ====================
def main():
    """演示CQRS流程"""
    print("=" * 60)
    print("CQRS 模式演示 - 订单系统")
    print("=" * 60)
    
    # 初始化基础设施
    write_repo = WriteModelRepository()
    read_repo = ReadModelRepository()
    event_bus = EventBus()
    
    # 初始化事件处理器
    updater = ReadModelUpdater(read_repo)
    event_bus.register('order.created', updater.handle_order_created)
    event_bus.register('order.paid', updater.handle_order_paid)
    
    # 初始化命令处理器和查询处理器
    cmd_handler = OrderCommandHandler(write_repo, event_bus)
    query_handler = OrderQueryHandler(read_repo)
    
    # 1. 创建订单
    print("\n--- 执行命令:创建订单 ---")
    items = [
        {"product_id": "p1001", "name": "笔记本电脑", "price": 5999.00, "quantity": 1},
        {"product_id": "p1002", "name": "无线鼠标", "price": 99.00, "quantity": 2}
    ]
    order_id = cmd_handler.handle_create_order("user_123", items)
    print(f"订单创建成功: {order_id}")
    
    # 稍等片刻,让事件异步处理(实际是同步的,因为事件总线是同步的,但为演示效果)
    time.sleep(0.5)
    
    # 2. 查询订单(立即查询,应能看到刚创建的订单)
    print("\n--- 执行查询:查询订单详情 ---")
    order = query_handler.handle_get_order(order_id)
    if order:
        print(f"订单详情: ID={order['order_id']}, 用户={order['user_id']}, "
              f"金额={order['total_amount']}, 状态={order['status']}")
        print(f"商品: {order['items']}")
    
    # 3. 支付订单
    print("\n--- 执行命令:支付订单 ---")
    cmd_handler.handle_pay_order(order_id, 5999.00 + 99.00*2)
    
    time.sleep(0.5)
    
    # 4. 再次查询订单,状态应为已支付
    print("\n--- 执行查询:查询更新后的订单 ---")
    order = query_handler.handle_get_order(order_id)
    if order:
        print(f"订单详情: ID={order['order_id']}, 状态={order['status']}, "
              f"支付时间={order['paid_at']}")
    
    # 5. 查询用户的所有订单
    print("\n--- 执行查询:用户订单列表 ---")
    orders = query_handler.handle_list_user_orders("user_123")
    print(f"用户 user_123 共有 {len(orders)} 个订单:")
    for o in orders:
        print(f"  - {o['order_id']} 状态:{o['status']} 金额:{o['total_amount']}")
    
    print("\n" + "=" * 60)
    print("演示完成")


if __name__ == "__main__":
    main()

4.3 代码说明

  1. 领域模型与事件:定义了 OrderPayment 数据类,以及 OrderCreatedEventOrderPaidEvent 事件。
  2. 写模型仓储WriteModelRepository 操作 orders_write.db,包含 orders 和 payments 表,符合范式。save_ordersave_payment 负责持久化。
  3. 读模型仓储ReadModelRepository 操作 orders_read.db,使用反范式的 order_details 表,包含所有查询需要的字段。upsert_order_detail 更新读库。
  4. 事件总线:简单的内存事件总线 EventBus,支持注册和发布事件。
  5. 命令处理器OrderCommandHandler 处理 create_orderpay_order 命令。命令执行后调用写仓储,并发布相应事件。
  6. 查询处理器OrderQueryHandler 处理查询,直接读取读库。
  7. 读模型更新器ReadModelUpdater 订阅事件,根据事件更新读库中的物化视图,保持最终一致性。
  8. 主程序:演示了创建订单、支付、查询的完整流程,验证了读写分离和同步机制。

5. 代码自查与注意事项

  • 数据库初始化:每次运行都会创建表,但不会清空现有数据,多次运行可能导致数据重复。实际生产环境应有迁移管理。
  • 事件总线:此处为同步内存实现,实际系统应使用消息队列(如 RabbitMQ)实现可靠异步通信。
  • 错误处理:未处理并发冲突(如版本号),真实环境应在更新读库时使用乐观锁。
  • 幂等性:事件处理器应设计为幂等,避免重复事件导致数据错误。
  • 最终一致性:命令执行后立即查询可能读不到最新数据(因为事件还未处理完),本示例中事件是同步的,所以能读到。但在真实异步场景,需要接受短暂延迟。

6. CQRS 模式总结

CQRS 是一种强大的架构模式,特别适用于以下场景:

  • 复杂的业务领域,读写逻辑差异大。
  • 高并发系统,需要独立扩展读写能力。
  • 需要为不同客户端(Web、移动、报表)提供不同数据视图。

通过将读写模型分离,CQRS 提升了系统的可维护性、可扩展性和性能。但也要注意其引入的复杂性,只有在真正需要时才应使用。

最佳实践

  1. 明确边界:只在核心业务模块使用 CQRS,不要过度设计。
  2. 选择同步机制:根据一致性要求,选择事件驱动、CDC(变更数据捕获)或定时批处理。
  3. 监控数据延迟:对最终一致性系统,监控同步延迟,设置告警阈值。
  4. 结合事件溯源:若需要完整审计日志或历史状态重建,可引入事件溯源。

CQRS 并非银弹,但合理运用能显著改善系统架构。希望通过本文的理论与实战,你能在自己的项目中灵活应用这一模式。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐