前端实时协作架构:从 CRDT 到 OT 的冲突解决与一致性保障

一、多人协作的"覆盖困境":最后写入胜出与数据丢失的工程痛点

在线文档、设计工具、代码编辑器等实时协作场景中,最基本的一致性问题是并发编辑冲突。两个用户同时修改同一段文字,如果采用"最后写入胜出"(Last Write Wins)策略,先写入的用户的内容会被覆盖,导致数据丢失。某在线文档工具在早期版本中就出现过这个问题——两个用户同时编辑同一段落的同一行,后保存的用户覆盖了前者的修改,而前者毫不知情。

更复杂的场景是网络分区下的编辑冲突。用户 A 在离线状态下编辑了文档,重新上线后需要将离线编辑与线上版本合并。如果离线期间其他用户也修改了同一位置,合并时如何保证不丢失任何一方的编辑?

两种主流的冲突解决算法是 OT(Operational Transformation)和 CRDT(Conflict-free Replicated Data Types)。OT 通过在应用操作前对操作进行变换来消除冲突,Google Docs 使用的就是 OT。CRDT 通过设计数学性质保证所有操作最终收敛到相同状态,无需变换,Figma 和很多现代协作工具采用 CRDT。两种算法各有优劣,选择取决于业务场景。

二、实时协作架构:从操作传播到冲突解决的三层模型

flowchart TB
    subgraph L1["第一层:操作捕获与传播"]
        A["用户编辑操作<br/>insert/delete/replace"] --> B["操作编码<br/>OpLog 条目"]
        B --> C["WebSocket 广播<br/>同步到其他客户端"]
        C --> D["服务端操作日志<br/>持久化存储"]
    end

    subgraph L2["第二层:冲突检测与解决"]
        D --> E{"并发操作检测<br/>操作位置是否重叠"}
        E -->|"无冲突"| F["直接应用操作"]
        E -->|"有冲突"| G{"冲突解决策略"}
        G -->|"OT"| H["操作变换<br/>调整操作参数"]
        G -->|"CRDT"| I["CRDT 合并<br/>数学保证收敛"]
    end

    subgraph L3["第三层:一致性保障"]
        F --> J["向量时钟<br/>因果顺序保证"]
        H --> J
        I --> J
        J --> K["最终一致性验证<br/>所有副本状态哈希"]
        K --> L["快照与压缩<br/>定期归档 OpLog"]
    end

    style E fill:#f96,stroke:#333
    style G fill:#9cf,stroke:#333
    style K fill:#9f9,stroke:#333

三层模型的设计逻辑:

第一层:操作捕获与传播。将用户的编辑操作(插入、删除、替换)编码为结构化的操作日志条目(OpLog Entry),包含:操作类型、位置、内容、作者、时间戳、版本号。操作通过 WebSocket 实时广播到所有在线客户端,同时持久化到服务端操作日志。离线编辑缓存在本地,重新上线后批量同步。

第二层:冲突检测与解决。当两个操作的目标位置重叠时,判定为并发冲突。OT 的解决方式是"变换"——根据已执行的并发操作调整待执行操作的参数。例如用户 A 在位置 5 插入 "X",用户 B 在位置 3 插入 "Y",两个操作并发到达用户 B 时,B 的插入位置需要从 3 调整为 4(因为 A 的插入使位置后移了一位)。CRDT 的解决方式是"设计"——为每个字符分配唯一标识符,插入操作基于标识符而非位置,删除操作使用墓碑标记而非物理删除,数学性质保证所有副本最终收敛。

第三层:一致性保障。向量时钟记录因果顺序——每个客户端维护一个逻辑时钟向量,操作携带时钟向量,接收方根据向量判断操作的先后顺序。最终一致性通过状态哈希验证——所有客户端定期计算文档状态的哈希值,如果哈希一致则确认收敛。操作日志定期压缩为快照,避免日志无限增长。

三、CRDT 协作引擎的代码实现

from dataclasses import dataclass, field
from typing import Optional
import hashlib

@dataclass
class CharId:
    """字符唯一标识符(Lamport 时间戳 + 客户端 ID)"""
    lamport: int
    client_id: str

    def __lt__(self, other: "CharId") -> bool:
        if self.lamport != other.lamport:
            return self.lamport < other.lamport
        return self.client_id < other.client_id

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, CharId):
            return False
        return (
            self.lamport == other.lamport
            and self.client_id == other.client_id
        )

    def __hash__(self) -> int:
        return hash((self.lamport, self.client_id))

@dataclass
class CharNode:
    """CRDT 字符节点"""
    id: CharId
    value: str                   # 字符内容
    left_id: Optional[CharId] = None   # 左邻居 ID
    right_id: Optional[CharId] = None  # 右邻居 ID
    deleted: bool = False        # 墓碑标记

@dataclass
class Operation:
    """编辑操作"""
    op_type: str                 # insert / delete
    char_id: Optional[CharId] = None
    value: str = ""
    left_id: Optional[CharId] = None
    right_id: Optional[CharId] = None
    origin_lamport: int = 0      # 操作产生时的 Lamport 时钟

class CRDTDocument:
    """基于 CRDT 的协作文档"""

    def __init__(self, client_id: str):
        self.client_id = client_id
        self.lamport_clock: int = 0
        self.chars: dict[CharId, CharNode] = {}

        # 虚拟首尾节点,简化边界处理
        self.start_id = CharId(0, "START")
        self.end_id = CharId(0, "END")
        self.chars[self.start_id] = CharNode(
            id=self.start_id, value="", right_id=self.end_id
        )
        self.chars[self.end_id] = CharNode(
            id=self.end_id, value="", left_id=self.start_id
        )

    def local_insert(
        self, index: int, value: str
    ) -> list[Operation]:
        """本地插入操作,返回操作列表"""
        self.lamport_clock += 1
        ops = []

        # 找到插入位置的左右邻居
        left_id, right_id = self._find_neighbors(index)

        for i, char in enumerate(value):
            char_id = CharId(self.lamport_clock, self.client_id)
            self.lamport_clock += 1

            node = CharNode(
                id=char_id,
                value=char,
                left_id=left_id,
                right_id=right_id,
            )
            self.chars[char_id] = node

            # 更新邻居指针
            if left_id and left_id in self.chars:
                self.chars[left_id].right_id = char_id
            if right_id and right_id in self.chars:
                self.chars[right_id].left_id = char_id

            # 下一个字符的左邻居是当前字符
            left_id = char_id

            ops.append(Operation(
                op_type="insert",
                char_id=char_id,
                value=char,
                left_id=node.left_id,
                right_id=node.right_id,
                origin_lamport=self.lamport_clock,
            ))

        return ops

    def local_delete(self, index: int, length: int = 1) -> list[Operation]:
        """本地删除操作,使用墓碑标记"""
        ops = []
        visible_chars = self._get_visible_chars()

        for i in range(length):
            if index >= len(visible_chars):
                break

            char_node = visible_chars[index]
            char_node.deleted = True

            ops.append(Operation(
                op_type="delete",
                char_id=char_node.id,
                origin_lamport=self.lamport_clock,
            ))

        return ops

    def remote_apply(self, op: Operation):
        """应用远程操作"""
        self.lamport_clock = max(
            self.lamport_clock, op.origin_lamport
        ) + 1

        if op.op_type == "insert":
            self._remote_insert(op)
        elif op.op_type == "delete":
            self._remote_delete(op)

    def _remote_insert(self, op: Operation):
        """应用远程插入操作"""
        if op.char_id in self.chars:
            return  # 幂等:已存在则跳过

        node = CharNode(
            id=op.char_id,
            value=op.value,
            left_id=op.left_id,
            right_id=op.right_id,
            deleted=False,
        )
        self.chars[op.char_id] = node

        # 更新邻居指针
        if op.left_id and op.left_id in self.chars:
            self.chars[op.left_id].right_id = op.char_id
        if op.right_id and op.right_id in self.chars:
            self.chars[op.right_id].left_id = op.char_id

    def _remote_delete(self, op: Operation):
        """应用远程删除操作(墓碑标记)"""
        if op.char_id in self.chars:
            self.chars[op.char_id].deleted = True

    def get_text(self) -> str:
        """获取当前文档文本"""
        return "".join(
            node.value
            for node in self._get_visible_chars()
        )

    def get_state_hash(self) -> str:
        """计算文档状态哈希,用于一致性验证"""
        text = self.get_text()
        return hashlib.sha256(text.encode()).hexdigest()[:16]

    def _find_neighbors(
        self, index: int
    ) -> tuple[Optional[CharId], Optional[CharId]]:
        """找到插入位置的左右邻居 ID"""
        visible = self._get_visible_chars()

        if index == 0:
            return self.start_id, visible[0].id if visible else self.end_id
        elif index >= len(visible):
            return visible[-1].id if visible else self.start_id, self.end_id
        else:
            return visible[index - 1].id, visible[index].id

    def _get_visible_chars(self) -> list[CharNode]:
        """获取所有未删除的字符,按顺序排列"""
        # 从起始节点开始,沿 right_id 链遍历
        result = []
        current_id = self.chars[self.start_id].right_id

        while current_id and current_id != self.end_id:
            if current_id in self.chars:
                node = self.chars[current_id]
                if not node.deleted:
                    result.append(node)
                current_id = node.right_id
            else:
                break

        return result


class OTTransform:
    """OT 操作变换引擎"""

    @staticmethod
    def transform_insert_insert(
        op_a: dict, op_b: dict
    ) -> tuple[dict, dict]:
        """两个插入操作的变换"""
        a_prime = op_a.copy()
        b_prime = op_b.copy()

        if op_a["position"] < op_b["position"]:
            # A 在 B 前面,B 的位置需要后移
            b_prime["position"] += len(op_a["text"])
        elif op_a["position"] > op_b["position"]:
            # A 在 B 后面,A 的位置需要后移
            a_prime["position"] += len(op_b["text"])
        else:
            # 同一位置,按客户端 ID 排序决定先后
            if op_a["client_id"] < op_b["client_id"]:
                b_prime["position"] += len(op_a["text"])
            else:
                a_prime["position"] += len(op_b["text"])

        return a_prime, b_prime

    @staticmethod
    def transform_insert_delete(
        insert_op: dict, delete_op: dict
    ) -> tuple[dict, dict]:
        """插入与删除操作的变换"""
        ins_prime = insert_op.copy()
        del_prime = delete_op.copy()

        if insert_op["position"] <= delete_op["position"]:
            # 插入在删除位置之前,删除位置后移
            del_prime["position"] += len(insert_op["text"])
        elif insert_op["position"] >= delete_op["position"] + delete_op["length"]:
            # 插入在删除范围之后,不受影响
            pass
        else:
            # 插入在删除范围内,删除长度增加
            del_prime["length"] += len(insert_op["text"])

        return ins_prime, del_prime

    @staticmethod
    def transform_delete_delete(
        op_a: dict, op_b: dict
    ) -> tuple[dict, dict]:
        """两个删除操作的变换"""
        a_prime = op_a.copy()
        b_prime = op_b.copy()

        a_start = op_a["position"]
        a_end = op_a["position"] + op_a["length"]
        b_start = op_b["position"]
        b_end = op_b["position"] + op_b["length"]

        if a_end <= b_start:
            # A 在 B 前面,B 的位置前移
            b_prime["position"] -= op_a["length"]
        elif b_end <= a_start:
            # B 在 A 前面,A 的位置前移
            a_prime["position"] -= op_b["length"]
        else:
            # 重叠区域,需要分割处理
            overlap_start = max(a_start, b_start)
            overlap_end = min(a_end, b_end)
            overlap = overlap_end - overlap_start

            if overlap >= op_a["length"]:
                a_prime["length"] = 0  # A 完全被 B 覆盖
            else:
                a_prime["length"] -= overlap

            if overlap >= op_b["length"]:
                b_prime["length"] = 0
            else:
                b_prime["length"] -= overlap

        return a_prime, b_prime

关键设计决策:CRDT 实现采用 RGA(Replicated Growable Array)算法——每个字符有唯一 ID(Lamport 时间戳 + 客户端 ID),字符间通过左右邻居 ID 维持顺序。插入操作基于邻居 ID 而非绝对位置,因此并发插入不会冲突。删除使用墓碑标记而非物理删除,保证其他客户端的邻居引用不会失效。OT 变换引擎处理三种操作组合:插入-插入、插入-删除、删除-删除,每种组合有独立的变换逻辑。

四、协作方案的边界与权衡

CRDT vs OT 的选择:CRDT 的优势是去中心化——无需中央服务器做变换,每个客户端独立解决冲突。劣势是元数据开销大——每个字符需要存储 ID 和邻居引用,内存占用约为原始文本的 3-5 倍。OT 的优势是元数据开销小——操作本身是轻量级的。劣势是需要中央服务器做变换,服务器是单点。对于文档类应用(字符数万级),CRDT 的内存开销可接受;对于代码编辑器(字符数十万级),OT 更合适。

墓碑膨胀:CRDT 的删除操作使用墓碑标记,删除的字符仍占用内存。长时间编辑的文档可能积累大量墓碑,导致内存和性能问题。缓解策略是定期执行"垃圾回收"——当所有客户端确认已同步到某个版本后,可以安全地移除该版本之前的墓碑。

光标同步:当前实现只解决了文本一致性,未涉及光标位置同步。光标位置基于字符 ID 而非绝对位置,当其他用户插入或删除文本时,光标需要跟随移动。光标同步的复杂度不亚于文本同步,需要单独设计。

离线编辑的合并:长时间离线后重新上线,需要将大量本地操作与服务端操作合并。如果离线期间文档被大幅修改,合并后的结果可能不符合用户预期。建议在合并后展示差异视图,让用户确认合并结果。

五、总结

前端实时协作架构通过三层模型——操作捕获与传播、冲突检测与解决、一致性保障——解决了多人并发编辑的核心问题。CRDT 通过唯一标识符和墓碑标记保证数学收敛,OT 通过操作变换消除并发冲突。两者各有适用场景:CRDT 适合去中心化、字符数适中的文档协作,OT 适合需要中央服务器控制、字符数大的代码协作。落地时需注意三点:一是 CRDT 的墓碑膨胀需要定期垃圾回收;二是光标同步需要基于字符 ID 而非绝对位置;三是长时间离线后的合并需要差异确认机制。实时协作的本质是"让每个用户都感觉自己在独占编辑",而冲突解决算法是达成这一目标的数学基础。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐