如何设计 CRDT 算法
December 23, 2021 · 18 min read
Marc Shapiro 在论文 Conflict-free replicated data types 中正式定义了两种 CRDT 方法,基于操作的(Operation-based) 和基于状态的(State-based),以及他们的充分条件。从而依据充分条件就可以更轻松明确地设计出 CRDT 算法。下文将介绍这些充分条件
PS: 为了方便理解对原文进行了简化,严谨的推导和证明见 CRDT 原文。
术语 & 一些定义
为了深入 CRDT 的设计原则,我们需要先清晰定义问题。
我们考虑这样一个异步网络系统:系统中有多个副本(replica),每个副本代表一个进程/一个计算机,副本可能会崩溃,我们用正确的副本指未崩溃的副本。一个副本可能会崩溃之后永不恢复,或者保持内存完整性的情况下地恢复。副本之间可能会被分区。
最终一致性 - Eventual Consistency
最终一致性由三个属性构成
- Eventual Delivery: 发布到一个正确的副本上的更新列表最终将被传达至所有正确的副本
- 收敛 Convergence: 收到了一样的更新列表的正确副本们最终状态将一致
- 终止 Termination: 所有执行的方法都会终止(即保证算法能在有限时间内完成计算,而不是要进行 次计算来遍历整个状态空间的这种算法)
💪 强最终一致性 - Strong Eventual Consistency (SEC)
强最终一致性定义为:满足「最终一致性 Eventual Consistency」且具有「强收敛性 Strong Convergence」。
- 强收敛 Strong Convergence: 收到了一样的更新列表的正确副本们的状态一致
「强最终一致性 SEC」和「最终一致性 EC」的区别在于 EC 有可能要求用户解决冲突,而 SEC 是不会发生冲突的。
CRDT
CRDT 的定义为:满足「强最终一致性」的数据类型。
偏序集 - Partial Order
给定集合S,“≤”是S上的二元关系,若“≤”满足:
- 自反性:∀a∈S,有a≤a;
- 反对称性:∀a,b∈S,a≤b且b≤a,则a=b;
- 传递性:∀a,b,c∈S,a≤b且b≤c,则a≤c;
则称集合 S 为偏序集
例如下图是以包含关系定义的集合上的偏序,A -> B 的箭头表示 A ≤ B。而 {x}
和 {y}
二者之间是不可比较的。
半格 - Semilatice
- 半格是一个偏序集
- 每个非空集合都有上确界或者下确界
- 上确界被称为 Least upper bound (LUB)
两种 CRDT 类型
基于状态的 CRDT (State-based CRDT, CvRDT)
设计一个基于状态的 CRDT 中需要先定义一个 State-based Object。当它满足一定的性质时就能成为 State-based CRDT 👏。State-based Object 的定义包括:
State
副本的内部状态的类型state_zero
内部状态的初始值≤
定义了 state 之间的顺序update(s, u)
定义 state 的更新方式merge(s, s')
函数可以用于合并两个状态得到新状态- 副本之间通过传递自己的 state,并进行 merge 操作来达到一致性
单调半格对象 - Monotonic Semilattice Object
指一个 Stated-base Object,如果它满足以下性质就称其为单调半格
- 该对象集合是一个以 ≤ 为顺序的半格
- 合并「本地状态 s」和「远端状态 s’」的方式为计算二者的上确界(LUB),即 merge(s, s’) =
- 在所有的更新中,s 都是单调递增的,即 s ≤ update(s, u)
单调半格示意:从左到右单向递增,合并操作被定义为最小上确界(LUB),所有改动都 deliver 之后状态一致
定理
定理:假设有 Eventual delivery 和 Termination 的属性,那么任意单调半格的 State-based 对象都是有强最终一致性(Strong Eventual Consistency)的。
直觉上我们应该如何理解这个定理呢?
观察到最小上确界操作 符合以下性质:
- 结合律: =
- 交换律: =
从而收到同样的更新列表 u = ,但更新顺序不同的副本,他们的最终状态因为有交换律和结合律都等于 ,所以此时所有副本的状态都一致,符合 SEC。详细的定理证明见原文。
如何设计 State-based CRDT
- 保证系统满足 Eventual delivery 和 Termination
- state 在 ≤ 上满足偏序集的要求
- 对任意的 s, u 满足
s ≤ update(s, u)
merge(s, s')
得到的是两个状态的上确界
基于操作的 CRDT(Op-based CRDT, CmRDT)
同样的设计一个基于操作的 CRDT 中需要先定义一个 Op-based Object。当它满足一定的性质时就能成为 Op-based CRDT。Op-based Object 的定义包括:
state
是每个副本的内部状态state_zero
内部状态的初始值op
是每个原子操作的类型,副本直接通过传递 op 来达到同步apply_op(state, op)
是在一个 state 上应用 op 的函数,返回新的状态;op 的交换律指的就是apply_op(apply_op(state, opA), opB) == apply_op(apply_op(state, opB), opA)
check_state(state, op)
确认一个状态是否满足应用 op 的前置条件- Op-based CRDT 的每一个操作 op 都有对应的前置状态检查函数 check_state,在应用 op 之前需要检查 check_state 函数是否满足,如果不满足就将阻塞延迟
check_state
函数的目的是为了保证 op 所依赖的因果顺序成立,例如删除 x 节点的操作依赖于 x 节点被创建的操作,否则就无法应用该操作。这也意味着 Op-based CRDT 的使用者有责任证明每个操作的前置状态都是能够得到满足的
- 副本之间通过传递彼此缺失的 op,并进行 apply_op 来达到最终一致性
并行的操作 - Concurrent Operation
根据每个 op 在副本上的执行顺序我们可以定义 op 之间的偏序关系:
- 如果一个 op A 在某个副本上先于 op B 执行,那么 A < B
- 如果既没有 A < B 也没有 B < A,那么我们就称 A 和 B 是并行的操作
定理
构建一个 Op-based CRDT 的充分条件为:
假设所有操作都按照因果顺序 deliver,且所有更新函数都会终止(满足 Termination)。那么所有满足以下条件的 op-based object 就具有强最终一致性(SEC)
- 所有的并行的操作都满足交换律
- 每个操作的前置条件都能通过按因果顺序应用得到满足
假设所有操作都满足 Eventual Deliver (按照随机顺序 deliver),且所有更新函数都会终止(满足 Termination)。那么所有满足以下条件的 op-based object 就具有强最终一致性(SEC)
- 所有的操作都满足交换律
如何直观理解该定理: 如果有可能冲突的 Op 都是可以交换的,那么它们在一个副本上的应用顺序就对 Object 的最终状态没有影响,那么任意两个应用了同样的 op 集合的 Op-based CRDT 就都具有一致的状态,从而满足 SEC。
详细的定理证明见原文。
如何设计 Op-based CRDT
- 保证系统满足 Eventual delivery 和 Termination
- 保证可能出现并行的 Operation 都满足交换律(不管先应用哪个 Op,最终状态都一致)
- 在应用 Op 时需要保证该 Op 所依赖的前置状态得到满足
CmRDT 和 CvRDT 是等价的
在形式上,Op-based CRDT 和 State-based 是可以互相转换的。思路为:
- 通过 Op-based CRDT 构建 State-based CRDT 的方式为:
- 将新的 State-based Object 的 state 定义为一个二元组(s, M),s 和 Op-based CRDT 的内部状态一致,M 是 Op-based CRDT 的内部 Op 的集合。
- 将新的 State-based Object 的 merge 操作定义为
merge((s, M), (s', M')) = apply_ops(s, M' - M)
- 通过 State-based CRDT 构建 Op-based CRDT 的方式为:
- 将新的 Op-based object 的 Op 定义为 State-based CRDT 的 State
- 将
apply_op
的操作定义为apply_op(state, op) = merge(state, op)
,而 merge 是服从对称性的操作,从而我们能够满足 SEC 得到一个 Op-based CRDT
设计 CRDT
根据上文我们有两种设计 CRDT 的方法,汇总为
如何设计一个 State-based CRDT
需要定义的内容 | 描述 |
---|---|
State | 内部状态的类型是什么 |
state_zero: State | 初始状态 |
≤ | 状态之间的偏序关系 |
merge(s0: State, s1: State): State | 合并两个状态的方法,应得到 s 和 s’ 的最小上确界 |
Update | 更新的类型 |
update(s: State, u: Update): State | 定义状态的更新方式,. s ≤ update(s, u) |
同时我们要保证
- Eventual Delivery
- 每个函数都会终止
如何设计一个 Op-based CRDT
假设 op 不一定按照因果顺序 deliver,那么设计的方式为
需要定义的内容 | 描述 |
---|---|
State | 内部状态的类型是什么 |
state_zero: State | 初始状态 |
Op | 操作的类型 |
apply(state: State, op: Op): State | 应用一个操作的函数,需要满足交换律,即 . apply(apply(s, ), ) = apply(apply(s, ), ) |
check_state(s: State, op: Op): boolean | 确认一个 op 所依赖的前置条件是否被满足 |
同时我们要保证
- 所有的 Op 最终都会被发布到每一个副本上
- 每个函数都会终止
- 保证每个 op 的前置条件最终都能得到满足
设计一个可增加删除元素的 Set CRDT
基于上面的 CRDT 框架我们已经有很好的理论支持我们设计一个 Last-write-wins Set,即出现同时删除和添加同一个元素时后写入的操作将覆盖先写入的操作。
但是在设计之前需要先了解在分布式系统中我们该如何判断事件的“先后”。
Lamport Timestamp
在中心化的系统中我们可以通过事件到达中心服务器的时间作为时间发生的时间戳来判断先后。但是在分布式环境中这种方式就不能使用了。此时我们可以使用 Lamport Timestamp。
Lamport Timestamp 的算法很简单
- 每个进程维护一个 counter
- 本地每发生一个事件就将 counter + 1,并将事件的时间戳设置为 counter 值
- 每当进程发送一个消息,就将本地 counter + 1,并将最新的 counter 值附带在消息上
- 当进程收到消息后,让自己的
counter = max(counter, message.counter) + 1
从而每一个消息都有一个明确的时间戳,根据时间戳我们就能够得到消息间的全序关系。但为了能够处理消息的 counter 相同的情况,我们还需要在消息中带上进程自己的 id,从而能够将全序关系定义为:
a.counter == b.counter?
a.pid < b.pid : a.counter < b.counter
Lamport timestamp,每个进程都会维护自己的计数器
实现
以 Op-based CRDT 的思路设计 Last-write-wins Set(LWWSet):
- 通过 Lamport timestamp 定义 Op 之间的全序关系
- 存在共同修改同一个元素的操作时,让 timestamp 更大的 Op 获胜
- 如果 Timestamp 一致,则让 pid 更大的 Op 获胜
TypeScript 实现代码
interface Op {
time: number;
pid: number;
type: "add" | "remove";
value: string;
}
type State = Map<string, Op>;
type ProcessState = { counter: number; state: State };
const state_zero: State = new Map();
const process_state_zero: ProcessState = {counter: 0, state: state_zero};
function apply(state:ProcessState, op: Op): ProcessState {
const new_state: ProcessState = {
counter: Math.max(state.counter, op.time) + 1,
state: new Map(state.state),
};
if (!new_state.state.has(op.value)) {
new_state.state.set(op.value, op);
} else {
const old_op = state.state.get(op.value);
if (
old_op.time < op.time ||
(old_op.time === op.time && old_op.pid < op.pid)
) {
new_state.state.set(op.value, op);
}
}
return new_state;
}
function check_state(state: State, op: Op) {
return true;
}
// 判断 LWWSet 是否含有某个值
function has(state: State, v: string): boolean {
return state.get(v)?.type === "add";
}
// 插入 v 到 LWWSet
function add(state: ProcessState, v: string): ProcessState {
return apply(state, {
time: state.counter,
pid: getPid(),
type: "add",
value: v,
});
}
// 删除 LWWSet 中的 v
function remove(state: ProcessState, v: string): ProcessState {
return apply(state, {
time: state.counter,
pid: getPid(),
type: "remove",
value: v,
});
}