考虑一个复制的键值存储的简单例子。集群的领导者处理所有对键值存储的写入。它将写入请求保存在预写日志中。预写日志使用领导者和追随者进行复制。在到达高水位标记时,领导者将预写日志的条目应用到键值存储中。这是一种标准的复制方法,称之为状态机复制(state-machine-replication)。大多数数据系统,其背后如果像 Raft 这样的共识算法支撑,都是这样实现的。在这种情况下,键值存储中会保存一个整数的版本计数器。每次根据预写日志应用键值与值的写命令时,这个计数器都要递增。然后,它会根据递增之后的版本计数器,构建一个新的键值。这样一来,不存在值的更新,但每次写入请求都会向后面的存储中附加一个新的值。
class ReplicatedKVStore…
int version = 0;
MVCCStore mvccStore = new MVCCStore();
public CompletableFuture<Response> put(String key, String value) {
return server.propose(new SetValueCommand(key, value));
private Response applySetValueCommand(SetValueCommand setValueCommand) {
getLogger().info("Setting key value " + setValueCommand);
version = version + 1;
mvccStore.put(new VersionedKey(setValueCommand.getKey(), version), setValueCommand.getValue());
Response response = Response.success(version);
return response;
能够快速定位到最佳匹配的版本,这是一个重要的实现考量,所以,有版本的价值常按这种方式进行组织:使用版本号当做键值的后缀,形成一个自然排序。这样就可以保持一个与底层数据结构相适应的顺序。比如说,一个键值有两个版本,key1 和 key2,key1 就会排在 key2 前面。
要存储有版本的键值与值,可以使用某种数据结构,比如,跳表,这样可以快速定位到最近的匹配版本上。使用 Java,可以像下面这样构建 MVCC 存储:
class MVCCStore…
public class MVCCStore {
NavigableMap<VersionedKey, String> kv = new ConcurrentSkipListMap<>();
public void put(VersionedKey key, String value) {
kv.put(key, value);
为了使用 NavigableMap,有版本的键值可以像下面这样实现。它会实现一个比较器,允许键值的自然排序。
class VersionedKey…
public class VersionedKey implements Comparable<VersionedKey> {
private String key;
private int version;
public VersionedKey(String key, int version) {
this.key = key;
this.version = version;
public String getKey() {
return key;
public int getVersion() {
return version;
public int compareTo(VersionedKey other) {
int keyCompare = this.key.compareTo(other.key);
if (keyCompare != 0) {
return keyCompare;
return Integer.compare(this.version, other.version);
这个实现允许通过 NavigableMap 的 API 获取特定版本的值。
class MVCCStore…
public Optional<String> get(final String key, final int readAt) {
Map.Entry<VersionedKey, String> entry = kv.floorEntry(new VersionedKey(key, readAt));
return (entry == null)? Optional.empty(): Optional.of(entry.getValue());
看一个例子,一个键值有四个版本,存储的版本号分别是 1、2、3 和 5。根据客户端所使用版本去读取值,返回的是最接近匹配版本的键值。
图2:Put 请求处理 图3:读取特定版本有时,客户端需要获取从某个给定版本号开始的所有版本。比如,在状态监控中,客户端就要获取从指定版本开始的所有事件。
class IndexedMVCCStore…
public class IndexedMVCCStore {
NavigableMap<String, List<Integer>> keyVersionIndex = new TreeMap<>();
NavigableMap<VersionedKey, String> kv = new TreeMap<>();
ReadWriteLock rwLock = new ReentrantReadWriteLock();
int version = 0;
public int put(String key, String value) {
try {
version = version + 1;
kv.put(new VersionedKey(key, version), value);
updateVersionIndex(key, version);
return version;
} finally {
private void updateVersionIndex(String key, int newVersion) {
List<Integer> versions = getVersions(key);
keyVersionIndex.put(key, versions);
private List<Integer> getVersions(String key) {
List<Integer> versions = keyVersionIndex.get(key);
if (versions == null) {
versions = new ArrayList<>();
keyVersionIndex.put(key, versions);
return versions;
这样,就可以提供一个客户端 API,读取从指定版本开始或者一个版本范围内的所有值。
class IndexedMVCCStore…
public List<String> getRange(String key, final int fromRevision, int toRevision) {
try {
List<Integer> versions = keyVersionIndex.get(key);
Integer maxRevisionForKey = versions.stream().max(Integer::compareTo).get();
Integer revisionToRead = maxRevisionForKey > toRevision ? toRevision : maxRevisionForKey;
SortedMap<VersionedKey, String> versionMap = kv.subMap(new VersionedKey(key, revisionToRead), new VersionedKey(key, toRevision));
getLogger().info("Available version keys " + versionMap + ". Reading@" + fromRevision + ":" + toRevision);
return new ArrayList<>(versionMap.values());
} finally {
有一种替代的实现方案,就是将所有值的列表和键值存在一起,就像在Gossip 传播(Gossip Dissemination)中所用的一样,以规避不必要的状态交换
MVCC 与事务隔离
Transaction isolation levels, such as [snapshot-isolation], can be naturally implemented as well. When a client starts reading at a particular version, it's guaranteed to get the same value every time it reads from the database, even if there are concurrent write transactions which commit a different value between multiple read requests.
图4:读取快照使用 RocksDb 当做存储引擎
有一种很常见的数据存储的做法,就是使用 rocksdb 或类似的嵌入式存储引擎当做存储后端。比如,etcd 使用 boltdb,cockroachdb 早期使用 rocksdb,现在它用的是 RocksDb的一个 Go 语言的克隆版,称为 pebble。
class VersionedKeyComparator…
public class VersionedKeyComparator extends Comparator {
public VersionedKeyComparator() {
super(new ComparatorOptions());
public String name() {
return "VersionedKeyComparator";
public int compare(Slice s1, Slice s2) {
VersionedKey key1 = VersionedKey.deserialize(ByteBuffer.wrap(s1.data()));
VersionedKey key2 = VersionedKey.deserialize(ByteBuffer.wrap(s2.data()));
return key1.compareTo(key2);
使用 rocksdb 可以这么做:
class RocksDBMvccStore…
private final RocksDB db;
public RocksDBMvccStore(File cacheDir) throws RocksDBException {
Options options = new Options();
options.setComparator(new VersionedKeyComparator());
db = RocksDB.open(options, cacheDir.getPath());
public void put(String key, int version, String value) throws RocksDBException {
VersionedKey versionKey = new VersionedKey(key, version);
db.put(versionKey.serialize(), value.getBytes());
public String get(String key, int readAtVersion) {
RocksIterator rocksIterator = db.newIterator();
rocksIterator.seekForPrev(new VersionedKey(key, readAtVersion).serialize());
byte[] valueBytes = rocksIterator.value();
return new String(valueBytes);
etcd3 使用的 MVCC 后端有一个单独的整数表示版本。
mongodb 和 cockroachdb 使用的 MVCC 后端有一个混合逻辑时钟。