本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。
本作品 (李兆龙 博文, 由 李兆龙 创作),由 李兆龙 确认,转载请注明版权。
文章目录
引言
单一的Raft算法我们必然无法去承载极大规模的数据,因为所有的数据都需要排队,然后走一遍Raft的流程,我们无法做到多个操作之间的并行执行,所以Multi-Raft这样的概念就显得非常迫切。仍旧是shard的概念,对keyspace内的key进行某种规则(hash,range)上的划分,然后一个范围看作一个Raft Group,我们称为Region。这些Region的leader可以分布在不同的物理机器上,这样不同的范围的key就可以并行的在不同的Region内执行了。此时引来了一个问题,就是热点的问题,有些Reigion的数据可能非常多,有些却非常少,我们就这样看着它们去两级分化吗,当然不是,我们希望在某个Reigon数据量较大的时候把其分成两个或两个以上的Region,并且把较小的Region合并成一个Region。这其实就是Split与Merge操作。
这篇文章主要阐述TiKV中Region之间的Split与Merge。文章第一部分探究这两类操作的触发时机,后面看看两类操作的实际处理过程,最后说说一些细节的地方。
Region epoch
首先引入Region epoch的原因是去判断不同Region config的新旧,其定义非常简单,其protobuf定义如下:
message RegionEpoch {
// Conf change version, auto increment when add or remove peer
uint64 conf_ver = 1;
// Region version, auto increment when split or merge
uint64 version = 2;
}
更新规则如下:
- 配置变更的时候, conf_ver + 1。
- Split 的时候,原 region 与新 region 的 version均等于原 region 的 version 加上新 region 个数。
- Merge 的时候,两个 region 的 version均等于这两个 region 的 version最大值 + 1。
以上步骤推出的结论就是当两个Region拥有的范围出现重叠的时候,比较两个Region的version就可以直到它们的先后关系了,因为但凡出现配置改变,version一定是单调递增的。
Split
触发时机
这里有两个触发时机:
- 每个Region定期检查自己是否需要分裂
- PD定期检查
split_checker_worker
这是一个后台任务,每隔split-region-check-tick-interval(默认 10s)的时间触发一次split检查,如果通过了所有的检查,就会向自己去proposal一条Split请求,从而执行分裂。
检查的流程如下:
- 调用 coprocessor::new_split_checker_host获取 SplitCheckerHost,获取时会对每一个注册过的 split_check_observers调用 add_checker,若满足触发阈值则会把它的 split_check加入 SplitCheckerHost::checkers中,如果 checkers为空则结束检查。
- 获取不同的策略,存在SCAN和 APPROXIMATE。
- 如果是SCAN的话,我们会遍历RocksDB中该Region的所有Default域内的key,调用每个 split_checker的 on_kv计算 Split key是否超过阈值应该执行。
下面是tinykv中这部分的代码,可以看到SCAN了RocksDB中的键,并执行on_kv计算,如果超过限制的话就会返回一个splitkey,此时就会利用这个key生成一个MsgTypeSplitRegion
类型的数据,然后proposal出去,其他节点接收到这条消息以后就会apply。这里的逻辑后面说。
func (r *splitCheckHandler) splitCheck(regionID uint64, startKey, endKey []byte) []byte {
txn := r.engine.NewTransaction(false)
defer txn.Discard()
r.checker.reset()
it := engine_util.NewCFIterator(engine_util.CfDefault, txn)
defer it.Close()
for it.Seek(startKey); it.Valid(); it.Next() {
item := it.Item()
key := item.Key()
if engine_util.ExceedEndKey(key, endKey) {
// update region size
r.router.Send(regionID, message.Msg{
Type: message.MsgTypeRegionApproximateSize,
Data: r.checker.currentSize,
})
break
}
if r.checker.onKv(key, item) {
break
}
}
return r.checker.getSplitKey()
}
func (checker *sizeSplitChecker) onKv(key []byte, item engine_util.DBItem) bool {
valueSize := uint64(item.ValueSize())
size := uint64(len(key)) + valueSize
checker.currentSize += size
if checker.currentSize > checker.splitSize && checker.splitKey == nil {
// 记录了该返回的splitkey
checker.splitKey = util.SafeCopy(key)
}
return checker.currentSize > checker.maxSize
}
PD调度
乍看起来每个Region就可以决定是否自己可以被Split了,为什么还需要引入PD来触发Split呢。在 TiDB 中新建一个表后,默认会单独切分出 1 个 Region 来存储这个表的数据,这个默认行为由配置文件中的 split-table 控制。但是可能某些表可能可预料到到会成为一个热点,虽然默认的行为也可以在后来使其分裂,但如果开始就有多个表的话就可以减少TiKV的抖动情况,这就是预分裂,即可以根据指定的参数,预先为某个表切分出多个 Region,并打散到各个 TiKV 上去。
这个过程由PD来完成。事实上PD有很多的调度策略[4],都存在coordinator.schedulers中。可以根据不同机器的Region size,Leader size等等来实现调度,当然后面要提到的merge也是由PD来完成的。
执行过程
首先执行过程起源于前面的MsgTypeSplitRegion
消息,当Storege收到这条消息后会对自身状态进行一个检查:
if len(splitKey) == 0 {
err := errors.Errorf("%s split key should not be empty", d.Tag)
log.Error(err)
return err
}
if !d.IsLeader() {
// region on this store is no longer leader, skipped.
log.Infof("%s not leader, skip", d.Tag)
return &util.ErrNotLeader{
RegionId: d.regionId,
Leader: d.getPeerFromCache(d.LeaderId()),
}
}
region := d.Region()
latestEpoch := region.GetRegionEpoch()
// This is a little difference for `check_region_epoch` in region split case.
// Here we just need to check `version` because `conf_ver` will be update
// to the latest value of the peer, and then send to Scheduler.
if latestEpoch.Version != epoch.Version {
log.Infof("%s epoch changed, retry later, prev_epoch: %s, epoch %s",
d.Tag, latestEpoch, epoch)
return &util.ErrEpochNotMatch{
Message: fmt.Sprintf("%s epoch changed %s != %s, retry later", d.Tag, latestEpoch, epoch),
Regions: []*metapb.Region{
region},
}
}
return nil
基本上就是检查自身是否还是leader以及当前的version是否已经发生了变化,因为如果version变化,这个splitkey就不一定是有效的了。
通过检查以后就是向PD请求一个新的regionID和peersID[],然后利用这些数据执行一条AdminCmdType_Split
类型的请求,随后这条数据就会像一条普普通通的Raft Log一样散播到其他副本上了。在commit以后需要让ApplyFsm去处理这条消息,这个处理的过程其实就是基于前面的splitkey去分割Region,当然只是在本机而已,这样我们要做的事情其实就是修改元数据而已。具体需要做的事情如下:
- 更新旧region的version信息
- 利用version,splitkey,peers等信息创建newregion
- 更新本地的region相关的信息
- 是leader的话还需要向PD报告(有可能是是region发起的split)
other
此时我们可以发现Split操作只是在本机而已,如果我们想要让这两个Region不存在同一个机器上怎么办呢,其实就是依靠PD的调度,因为TiKV实现了TransferLeader
,changepeer
这样的原语,所以在本机Split一个Region以后,后面可以很容易的把leader调度到其他机器或者把分裂的两个region的peers全部都置于不同的机器中,这主要取决于调度策略。
Merge
不同于Split只需要简单的proposal一条Raft Log,Merge中我们要合并的数据属于两个Raft Group上,而且可能与Split,SnapShot相互作用,我们还需要考虑网络隔离带来的问题,所以Merge的难度是比较高的。
其触发操作仍然由PD完成,触发条件如下:
- 如果 Region 的大小大于 max-merge-region-size(默认 20MB)或者 key 数量大于 max-merge-region-keys(默认 200000)不会触发 Merge。
- 对于新 Split 的 Region 在一段时间内 split-merge-interval(默认 1h)不会触发 Merge。
- 确保 Source Region 与 Target Region 的所有 Peer 都存在,该信息是通过 Region Leader 上报 PD 获得。
这里要说的是,Merge之前PD会把所有的peer通过Conf Change对其到同一台机器上,然后再去发送Merge。
整个过程在PD向Leader发送消息后分为PrepareMerge和CommitMerge两步。
PrepareMerge
PD发送的消息中附带着Target Region 的 Region 信息,在PrepareMerge请求实际被Propose之前,实际会执行一些检查:
- 本地的 Target Region 的 Epoch 跟 Region 信息中的一致
- 如果一致,检查是否是范围相邻 Region
- 是否所有对应 Peer 都在相同的 Store 上
- 所有 Follower 落后的日志数目小于 merge-max-log-gap(避免网络隔离,这部分数据会被广播,所以控制下大小)
- 从所有 Follower 上最小的 commit index 对应的日志到当前最后一条日志之间没有CompactLog/Split/Merge/Conf Change命令。
如果以上条件都被满足以后发送PrepareMergeRequest,其protobuf定义如下:
message PrepareMergeRequest {
uint64 min_index = 1; //所有 Follower 中最小的 match index
metapb.Region target = 2; // Target Region 信息
}
这个请求Apply时修改两个数据:
- 修改 RegionLocalState中 PeerState 为 Merging。
- 修改 RegionLocalState中 MergeState,存入 min_index,target,commit 信息,其中 commit 为 PrepareMerge对应的 index。
然后此时每一个Apply了PrepareMerge的Source peer都会定时触发merge tick,主要做了以下的检查:
- 首先我们要比较本机存储的Target Peer信息(称为A)的与前面存储在MergeState中的peer信息(称为B),这里可能出现A中的Epoch大于B的Epoch或者A根本不存在。有两种可能性:
- PD发生了调度,此时A的Epoch是要高的(merge tick是一个定时任务),我们需要rollback。
- 本地的 Target Peer 在随后通过 Conf Change 被移除了,也需要rollback。(配置不一样没办法commit)
- 如果A小于B的话说明此peer消息还未同步完全,继续等待
- 相等则进行下一步
- 给本地的target region发送一条CommitMerge消息,其protobuf定义如下:
message CommitMergeRequest {
metapb.Region source = 1;
uint64 commit = 2; // MergeState 中的 commit
repeated eraftpb.Entry entries = 3; // MergeState 中的 min_index + 1 到 commit 的 Raft Log。
}
这里把entries的消息发送给所有的节点是为了在切主时leader也能收到所有落后的消息。此时所有的涉及的机器上的source peer和target peer应该是配置相同,日志相同的,这个过程中source region一直无法提供服务。
CommitMerge
target region收到这条消息以后需要执行Merge逻辑:
- 本机器上Source Region的日志可能不够,我们利用entries更新日志,entries是利用所有peers中最低的commit index取的,所以日志一定是够的。
- 修改本机器上source region的peer为Tombstone。
- 修改target Region的range和version信息,持久化Localregion消息。
- 此时target region代表新的range提供服务。
other
这里其实[2]中对于正确性的论证是很有意思的,即如果需要Merge成功,我们至少需要确保:Target Peer 在 Apply CommitMerge的时候,本地一定存在对应的 Source Peer。
显然如果在CommitMerge消息中附带着Source Peer所有的数据就很简单了,但是不太现实,所以我们需要考虑conf change的情况,
考虑下面的日志分布:PrepareMerge|---A---|CommitMerge|---B---|
即如果出现这样的日志分布,证明A部分一定不会出现涉及此Region的conf change情况。对于B部分的Conf Change来说,显然等到其Apply的时候CommitMerge也Apply过了,所以重点其实就是在A中出现conf change的时候rollback。
[2]中对于lease的讨论没太看明白,source region不是在Apply CommitMerge之前是停止服务的吗,为什么还要考虑租约呢。
总结
其实主要这篇文章还是想把我以前学习时只知道是什么的东西通过代码的途径细致的学习后输出出来,以前只知道全局的调度器需要在某些时候把某些Raft Group以某种方式执行合并,这种疑问早在一年多以前看GFS论文的时候就已经存在了,当然我也只是基于TinyKV的代码去阐述,但好在和TiKV在逻辑上基本上大差不差,只是细节方面和功能方面有一点点阉割而已,学习原理还是绰绰有余了。
参考: