TinyKV Course: RaftStore源码解析
TinyKV RaftStore源码解析
TinyKV Course的Lab2B要求我们基于Raft Module实现一个key-value存储服务,在涉及到的模块中最重要的就是raftstore
。 因此我们先对其相关的代码进行阅读,本文是代码阅读的总结,可能会随着后面阅读的深入继续更新。
Overview
raftstore
是TinyKV中Raft存储的核心部分,它将raft::RawNode
, storage
, Transport
等部分拼接在了一起。并且由于TiKV采用的是Multi-Raft, 一台服务机器上可能会运行多个raft peer, 这些不同region的raft peer都被同一个raftstore
所管理。
上图展示了raftstore
大概的结构,这里并没有展示一个raftstore
结构体中的所有数据成员,而是选择了一些较为关键的部分。包括如下部分:
router
: 由于一个raftstore
中可能运行着多个raft peer,所以raftstore
需要对收到的message按照region id进行分发,每个raft peer独立地处理自己这个raft region中的数据。router结构就起到这个分发的作用。GlobalContext
:保存了raftstore运行时的一些上下文信息,这里展示了两个最重要的部分:Transport
用于发送raft message给其他raftstore,Engines
包含了两个持久化的key-value数据库,一个用于持久化raft log,一个作为raft的状态机。因为raftstore里面有多个来自不同region的raft peer,所以需要在RaftDB中进行区分,区分的方式是使用region_id
作为key的prefix。Ticker
: 周期性地发送时钟tick信息,用于驱动raft peer的状态向前更新。RaftWorker
: 一个周期性运行的worker,它会在一个for loop中做以下工作:查询是否有其他raft server发来的消息,如果有则对其进行处理:NewMsgHandler()
; 对每个raft peer进行状态更新,持久化log entry,apply entry到state machine等。Workers
: 一些其他相关的Worker
, 例如raft需要定期做log compaction等动作。
Source Code Overview
在正式开始解析代码之前,我们需要先对每个模块包含了哪些类,以及每个类包含了哪些成员数据进行调查。经过阅读,相关的类以及各自的成员如下:
在上面的UML图中,我们将raftstore
划分为了四个模块:
Ticker Module
: 作为整个raftstore的ticker模块,不断发出tick信号来驱动整个raftstore进行对应的状态更新等操作。raft module
: 作为整个raftstore最核心的部分,raft module的核心数据结构是peer
, 一个peer是对raft.RawNode
的进一步封装。包含了存储模块,snapshot模块等内容。RaftWorker
根据tick信号来不断更新raft模块中每个raft peer的状态。storage module
: 包含了PeerStorage
, 这是一个对raft.Storage
接口的实现,用于读取和持久化raft相关的持久化数据。其底层实现采用了一个持久化键值数据库badger.DB
GloablContext
: 包含了各个模块在运行时需要的一些上下文信息,例如raft peer在发送消息时需要Transport
来发送。
综上所述,raftstore
就是将上述各个模块链接在一起的adapter, 它将各个部分初始化,例如从底层的存储系统中读取之前已经存在的raft peer状态,初始化状态机等;然后创建一个RaftWorker
持续在TickDriver
的驱动下对各个部分进行更新操作。
Ticker Module
Data structure
我们首先对Ticker Module进行梳理,因为这部分代码相对比较简单,和其他模块的耦合也比较松。可以单独拿出来作为一个独立的模块。Ticker在整个Raft模块中的作用就是每过一段时间就发出一个时钟信号,由这个时钟信号来驱动Raft,snapshot,Log GC等模块的工作向前运作。
TickerModule模块的内容如下:
type tickDriver struct {
baseTickInterval time.Duration
newRegionCh chan uint64
regions map[uint64]struct{}
router *router
storeTicker *ticker
}
type ticker struct {
regionID uint64
tick int64
schedules []tickSchedule
}
type tickSchedule struct {
runAt int64
interval int64
}
每个域的作用如下:
baseTickInterval
说明了这个TickDriver
每过多少时间需要发出一个Tick信号。newRegionCh
用于接收是否有一个newRegion
的请求。regions
保存了在这个server上运行的所有raft peer的RegionID。router
提供了send
接口,tickDriver可以使用该接口向一个指定的region的peer发出tick消息;storeTicker
保存了和store相关的元数据,在raftstore中,除了Raft模块本身需要周期性地获取tick消息向前更新状态,一些store活动也需要向前执行, 例如Snapshot, LogGC等。这些活动也都是周期性进行的,所以需要一个tick。storeTicker
保存了这些活动的tick元数据,包括:每多少个tick触发一次;现在已经执行了多少个tick了。
对于ticker
, 它包含了一个tickSchedule
数组,该数组使用一个int类型来索引,其值实际上枚举了几种store活动的情况:
const (
PeerTickRaft PeerTick = 0
PeerTickRaftLogGC PeerTick = 1
PeerTickSplitRegionCheck PeerTick = 2
PeerTickSchedulerHeartbeat PeerTick = 3
)
const (
StoreTickSchedulerStoreHeartbeat StoreTick = 1
StoreTickSnapGC StoreTick = 2
)
每个tickSchedule
包含了一个interval和一个runAt元素,interval
用于表示该活动应该每多少个tick执行一次;runAt
表示该活动下一轮应该在哪个tick时运行。当一个store动作被执行之后,可以通过runAt = tick + interval
来更新该store动作的下一次运行时间。这实际上就是ticker.Schedule()
的执行流程。
Ticker Running
TickDriver
通过run
函数来启动一个ticker
, 这个ticker将会在一个死循环中运行下去,每过一段时间发出一个tick信号给router中的raft peer,然后检查是否有对应的store活动可以可以执行:
func (r *tickDriver) run() {
timer := time.Tick(r.baseTickInterval)
for {
select {
case <-timer:
for regionID := range r.regions {
if r.router.send(regionID, message.NewPeerMsg(message.MsgTypeTick, regionID, nil)) != nil {
delete(r.regions, regionID)
}
}
r.tickStore()
case regionID, ok := <-r.newRegionCh:
if !ok {
return
}
r.regions[regionID] = struct{}{}
}
}
}
run
函数创建一个timer
, 这个timer
是一个channel,另一端会每过一段时间就发出一个一个time结构体。在run
函数中,使用一个死循环来不断接收来自这个timer的消息。而每经过baseTickInterval
的时间,tickDriver
就会给router
中保存的每个peer发出一个MsgTypeTick
消息,使得Raft能知道已经经过了一个tick了,而RaftWorker
也会在一个死循环中检查当前是否有tick消息,然后调用raft.RawNode
的step来使得raft的状态得到更新。
每经过一个tick
,TickDriver
会在storeTicker
中调用tickStore
来检查是否有可执行的store活动, 即tickStore
函数:
func (r *tickDriver) tickStore() {
r.storeTicker.tickClock()
for i := range r.storeTicker.schedules {
if r.storeTicker.isOnStoreTick(StoreTick(i)) {
r.router.sendStore(message.NewMsg(message.MsgTypeStoreTick, StoreTick(i)))
}
}
}
isOnStoreTick
就是检查之前提到的tickSchedule
中每个store动作的runAt是否达到,如果达到,则通过sendStore
发起一个store动作调用。
Storage Module
接下来我们介绍storage module, storage module
主要实现了一个类PeerStorage
, 作为raft模块的底层持久化存储部分,该类实现了raft.Storage
这一接口。
PeerStorage data structure
type PeerStorage struct {
// current region information of the peer
region *metapb.Region
// current raft state of the peer
raftState *rspb.RaftLocalState
// current apply state of the peer
applyState *rspb.RaftApplyState
// current snapshot state
snapState snap.SnapState
// regionSched used to schedule task to region worker
regionSched chan<- worker.Task
// generate snapshot tried count
snapTriedCnt int
// Engine include two badger instance: Raft and Kv
Engines *engine_util.Engines
// Tag used for logging
Tag string
}
metapb.Region
包含了该raft peer的region信息,包括该region的Id,每个peer的id和storeId等(StoreID说明了peer在哪台机器上)。这是因为一台机器上可能有多个raft peer, 它们都一起存储在一个kv中,所以需要在存储的时候加上regionId作为前缀。rspb.RaftLocalState
和raspy.RaftApplyState
存储了一个raft peer的状态,如下所示:type RaftLocalState struct { HardState *eraftpb.HardState LastIndex uint64 LastTerm uint64 } type HardState struct { Term uint64 Vote uint64 Commit uint64 } type RaftApplyState struct { // Record the applied index of the state machine to make sure // not apply any index twice after restart. AppliedIndex uint64 // Record the index and term of the last raft log that have been truncated. (Used in 2C) TruncatedState *RaftTruncatedState } type RaftTruncatedState struct { Index uint64 Term uint64 }
其中
RaftLocalState.LastIndex
和RaftLocalState.LastTerm
记录了该storage中最后一个log entry的index和term。HardState
记录了Raft论文中提到的必须要持久化的部分,包括一个raft peer当前的term,votefor。raft的CommitIndex并不要求持久化,但是在state machine被持久化的情况下commitIndex和applyIndex都需要被持久化来避免重复的apply和commit。RaftTruncatedState
是关于一个raft peer在做snapshot时的状态,它记录了最后一个被截断的log entry的index和term。snap.SnapState
是关于snapshot的,这部分暂时省略。engine_util.Engines
封装了两个key-value store,一个用于保证raft状态的持久化,一个用于作为raft状态机。
Persistence Procedure
由于这里的engine需要存储来自不同region的raft peer的状态,所以需要在存储时对key增加额外的内容来区分这些状态是来自不同的raft peer,这些内容在存储时的格式如下:
Key | Key Format | Value | DB |
---|---|---|---|
raft_log_key | 0x01 0x02 region_id 0x01 log_idx |
Entry | Raft |
raft_state_key | 0X01 0X02 region_id 0x02 |
RaftLocalState | Raft |
apply_state_key | 0x01 0x02 region_id 0x03 |
RaftApplyState | kv |
region_state_key | 0x01 0x03 region_id 0x01 |
RegionLocalState | Kv |
下面分析PeerStorage相关的代码,主要包含两个方面:1. PeerStorage的创建与初始化;2. PeerStorage对raft状态(包括log)的持久化过程。
Object Creation & initialization
为了创建一个PeerStorage
结构,需要从Engine
中尝试读出之前已经存在的raft_local_state
和apply_state
。整个初始化的过程如下:
NewPeerStorage
会从engine
中分别读取RaftLocalState
和RaftApplyState
,并将它们作为PeerStorage
构造函数的一部分存储在PeerStorage
对象中。因此,当raft模块调用PeerStorage
的InitState
函数时,它只是简单地将局部存储的raftLocalState
和ConfState
返回给上层。
initRaftLocalState
会构造一个初始化的RaftLocalState
, 这包含两个方面:1. 直接从raftDB中以raft_state_key
作为key读取对应的value数据; 2. 如果raftDB中没有这样一个key, 则说明这个raft peer是第一次启动,需要做初始化。初始化时需要对raft的状态进行设置,这里设置raft的term为RaftInitLogTerm
, 即为5,设置raft
的log的lastIndex为RaftInitLogIndex
, 其值也为5。然后将初始化得到的状态调用PutMeta
进行初始化。initApplyState()
做类似的操作,这里省略。
经过上面的执行流程,我们得到了一个经过初始化的PeerStorage
, 它从底层的engine中读取之前的raftState和applyState,或者自己初始化上述必要的内容。
Persistence & read
Storage
的主要功能是为raft模块提供持久化的写入和读取功能。由于Lab2B要求我们实现PeerStorage的持久化功能,这里我们先解析其读取代码。然后根据读取操作的代码反推写入数据的格式和流程。
PeerStorage
中与读取操作有关的函数有以下几个:
func (ps *PeerStorage) Entries(low, high uint64) ([]eraftpb.Entry, error);
func (ps *PeerStorage) Term(idx uint64) (uint64, error);
func (ps *PeerStorage) InitialState() (eraftpb.HardState, eraftpb.ConfState, error);
其中InitialState
基本上就是直接返回PeerStorage
中存储的raft状态,ConfState
也是直接返回Region
的信息,所以这部分代码没有过多需要描述的。
Term
Term
接口对于给定的一个raft index,返回该index对应entry的term。在执行时,该函数首先需要检查给定的index是否在一个合法的范围内:即它没有超出目前已知的最大的raft index,也没有被截断(compacted):
func (ps *PeerStorage) Term(idx uint64) (uint64, error) {
// Check if this index is the last compacted index
if idx == ps.truncatedIndex() {
return ps.truncatedTerm(), nil
}
// check if entries contain a range [idx, idx + 1)
if err := ps.checkRange(idx, idx+1); err != nil {
return 0, err
}
// Otherwise the index is stored in raftDB, but an acceleration is to simply return lastTerm when
// truncated term equals to lastTerm.
if ps.truncatedTerm() == ps.raftState.LastTerm || idx == ps.raftState.LastIndex {
return ps.raftState.LastTerm, nil
}
// Otherwise, read this entry from raftDB
var entry eraftpb.Entry
if err := engine_util.GetMeta(ps.Engines.Raft, meta.RaftLogKey(ps.region.Id, idx), &entry); err != nil {
return 0, err
}
return entry.Term, nil
}
// checkRange checks if specified range [low, high) is contained in this log
func (ps *PeerStorage) checkRange(low, high uint64) error {
if low > high {
return errors.Errorf("low %d is greater than high %d", low, high)
} else if low <= ps.truncatedIndex() {
return raft.ErrCompacted
} else if high > ps.raftState.LastIndex+1 {
return errors.Errorf("entries' high %d is out of bound, lastIndex %d",
high, ps.raftState.LastIndex)
}
return nil
}
上面的Term
实现分析了给定index的几种情况,然后分别返回错误码或者结果。这里在12行的位置有一个优化的方案,当给定的raft index确实在raft log范围内时,并不是一定要从raftKV中读取该entry。首先判断truncatedTerm和lastTerm是否相等,如果相等,那么这两个index之间的所有entry的term都是相同的,所以可以直接返回。
这里需要说明的是,上述代码在做检查时使用的lastIndex
, lastTerm
等数据都是保存在PeerStorage
object中的,所以当PeerStorage
调用持久化相关的函数时,需要实时更新这些域。
Entries
Entries
函数会给定一个传入的index的range, 由[low, high)
来指定,然后它将返回index在这个范围内的所有的entry。值得注意的一点是,如果这个范围是非法的(通过checkRange
来检查),那么会直接返回错误码,而不是去返回low~lastIndex这个范围之间的entry。如果通过检查,Entries
会在raftDB
中构造一个迭代器,不断取出给定范围内的数据:
func (ps *PeerStorage) Entries(low, high uint64) ([]eraftpb.Entry, error) {
...
startKey := meta.RaftLogKey(ps.region.Id, low)
endKey := meta.RaftLogKey(ps.region.Id, high)
iter := txn.NewIterator(badger.DefaultIteratorOptions)
defer iter.Close()
for iter.Seek(startKey); iter.Valid(); iter.Next() {
item := iter.Item()
if bytes.Compare(item.Key(), endKey) >= 0 {
break
}
val, err := item.Value()
if err != nil {
return nil, err
}
var entry eraftpb.Entry
// Unmarshal is the same as deserialize, which converts a serialized bytes into a structured log entry
if err = entry.Unmarshal(val); err != nil {
return nil, err
}
// May meet gap or has been compacted.
if entry.Index != nextIndex {
break
}
nextIndex++
buf = append(buf, entry)
}
...
}
由于这里startKey
的构造方式与上面提到的key的raft log key的格式保持一致。但是值得注意的是在写入uint64这样的数据时采用的是big endian的方式,这样可以保证按字节比较的顺序(即raftDB比较key的方式)与index按照uint64的比较方式相同:
func makeRegionKey(regionID uint64, suffix byte, subID uint64) []byte {
key := make([]byte, 19)
key[0] = LocalPrefix
key[1] = RegionRaftPrefix
binary.BigEndian.PutUint64(key[2:], regionID)
key[10] = suffix
binary.BigEndian.PutUint64(key[11:], subID)
return key
}
而写入的value则是这个entry序列化之后的结果,通过Unmarshal
以及Marshal
可以将一个对象在它的类型与字节数组之间进行转换。Marshal
可以得到写入该raftDB的value,Unmarshal
可以从读取得到的字节数组重建该数据结构。
Summary
本文梳理了TinyKV中raftstore
结构的构成以及与之相关的代码,并介绍了Ticker Module
和Storage Module
两部分的代码。接下来会讲述Raft Module
的代码,重点在于raftstore
的创建与初始化,以及运行时RaftWorker
是如何驱动raftstore的数据结构在时钟和外部消息输入的情况下更新raft状态。