etcd-raft Raftexample
Run
启动前先构建 raftexample
cd <directory>/src/go.etcd.io/etcd/contrib/raftexample
go build -o raftexample
启动一个三节点集群
raftexample --id 1 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 12380
raftexample --id 2 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 22380
raftexample --id 3 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 32380
每个 raftexample 程序都包含一个 Raft 实例和一个 k/v 服务,多个 peer 可以使用逗号分隔。
# 测试 K/V 的存储功能
curl -L http://127.0.0.1:12380/my-key -XPUT -d hello
curl -L http://127.0.0.1:12380/my-key
Design
raftexample 由三个组件组成
- K/V 存储, 存储所有已提交的键值对,充当 Raft Servers 和 REST server 之间通信的桥梁
- REST API server,REST server 通过访问 K/V store 来与 Raft servers 交流
- Raft Servers,当 Raft servers 达成共识后,可以接收 REST server 提交的 proposal 并传递给 peers。当 raft 达成共识后会将提交更新给 K/V store
(图源叉鸽)
REST API
raftexample 通过 serverHttpKVAPI
启动 REST server,具体启动服务的过程是对 http 库的简单的使用,不过多展示。我们只需要知道在启动的时候讲 httpKVAPI
注册给了 Handler
,并在请求到达时会调用 httpKVAPI.ServeHTTP()
方法即可
func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
//... ...
switch {
case r.Method == "PUT":
// ... ...
h.store.Propose(key, string(v))
// ... ...
case r.Method == "GET":
if v, ok := h.store.Lookup(key); ok {
// ... ...
}
// ... ...
case r.Method == "POST":
url, err := ioutil.ReadAll(r.Body)
// ... ...
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: nodeId,
Context: url,
}
h.confChangeC <- cc
// ... ...
case r.Method == "DELETE":
// ... ...
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: nodeId,
}
h.confChangeC <- cc
// ... ...
default:
// ... ...
}
}
GET
请求,调用 store 的Lookup()
方法查找PUT
请求,调用 store 的Propose()
提议一个 KV 数据POST/PUT
是配置变更请求,将请求通过 channel 传递给 raft 层处理
KV Store
KV store 是连接 raft servers 与 REST server 的桥梁,通过 kvstore
结构表示
// a key-value store backed by raft
type kvstore struct {
proposeC chan<- string // channel for proposing updates
mu sync.RWMutex
kvStore map[string]string // current committed key-value pairs
snapshotter *snap.Snapshotter
}
proposeC
:kvstore
通过这个 channel 将 propose 信息发送到 raft,raft 将对此进行 proposekvStore
: 存储已提交数据的 mapsnapshotter
:启动时可以通过快照恢复到原来的状态,具体过程在loadSnapshot()
和recoverFromSnapshot
方法中,这里不过多介绍
当从快照中恢复后,kvstore
会通过调用 readCommits()
方法监听 raft 层传递的提交信息
func (s *kvstore) readCommits(commitC <-chan *commit, errorC <-chan error) {
for commit := range commitC {
if commit == nil {
// signaled to load snapshot
snapshot, err := s.loadSnapshot()
if snapshot != nil {
if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
log.Panic(err)
}
}
continue
}
for _, data := range commit.data {
var dataKv kv
dec := gob.NewDecoder(bytes.NewBufferString(data))
if err := dec.Decode(&dataKv); err != nil {
log.Fatalf("raftexample: could not decode message (%v)", err)
}
s.mu.Lock()
s.kvStore[dataKv.Key] = dataKv.Val
s.mu.Unlock()
}
close(commit.applyDoneC)
}
if err, ok := <-errorC; ok {
log.Fatal(err)
}
}
commitC
,raft 层返回的 channel,当 raft 层就一个命令达成共识后,会向这个 channel 中发送信号errorC
,raft 层返回的 channel
raftexample 通过 nil 表明 Raft 层生成了快照,所以当
commitC
接收到 nil 值时需要检查快照是否存在,如果存在就必须进行加载。
当通过 commitC
接收到 raft 传递过来的提交的信号时,kvstore
就知道该值已经达成了共识,于是就可以将该 kv 对存储下来了
前面我们提到过,当 REST server 收到 GET
请求时会调用 store 的 Lookup()
方法查找数据,这个过程很简单,直接查找 kvstore
中存储的数据即可,不需要访问 raft 层。当 REST server 收到 PUT
请求时会调用 Propose()
方法提议一个命令,其实现如下:
func (s *kvstore) Propose(k string, v string) {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
log.Fatal(err)
}
s.proposeC <- buf.String()
}
可以看到,store 层通过 proposeC
这个 channel 将封装好的 K/V 传递给了 Raft 层,所以接下来的 Raft 层才是真正的核心
Raft Node
这一部分是 raftexample 的核心,因为他告诉了我们该如何使用 etcd 的 Raft 模块。
Raft 节点通过 raftNode
表示,其结构如下所示:
// A key-value stream backed by raft
type raftNode struct {
proposeC <-chan string // proposed messages (k,v)
confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
commitC chan<- *commit // entries committed to log (k,v)
errorC chan<- error // errors from raft session
id int // client ID for raft session
peers []string // raft peer URLs
join bool // node is joining an existing cluster
waldir string // path to WAL directory
snapdir string // path to snapshot directory
getSnapshot func() ([]byte, error)
confState raftpb.ConfState
snapshotIndex uint64
appliedIndex uint64
// raft backing for the commit/error channel
node raft.Node
raftStorage *raft.MemoryStorage
wal *wal.WAL
snapshotter *snap.Snapshotter
snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready
snapCount uint64
transport *rafthttp.Transport
stopc chan struct{} // signals proposal channel closed
httpstopc chan struct{} // signals http server to shutdown
httpdonec chan struct{} // signals http server shutdown complete
logger *zap.Logger
}
proposeC <-chan string
用于接收普通的 propose 通知confChangeC <- chan ConfChange
用于接收配置变更通知commitC chan<- *commit
,当 Raft 就某个命令达成共识时,通过commitC
通知上层 store 该命令可以 apply 了errorC chan<- error
,用于将错误报告给上层组件id
,该 raft node 的 idpeers
,集群中的其他 peerraftStorage
,Raft 的存储状态,即 RSM。Raft 提供了etcd/raft/storage.go
接口,用于定义操作 RSM 的方法node
Raft 节点- ......
raftNode
的详细启动过程可以在 newRaftNode
中查看,这里只大概梳理一下流程。raft 启动前,先会通过 replayWAL()
方法尝试重放 WAL 日志,重放完成后会通知上层 store 加载快照。恢复完成状态后就会构建与集群相关的信息
rpeers := make([]raft.Peer, len(rc.peers))
for i := range rpeers {
rpeers[i] = raft.Peer{ID: uint64(i + 1)}
}
c := &raft.Config{
ID: uint64(rc.id),
ElectionTick: 10,
HeartbeatTick: 1,
Storage: rc.raftStorage,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
MaxUncommittedEntriesSize: 1 << 30,
}
随后便会根据是否有 wal 来决定应该重启还是启动一个新节点。启动一个新节点相比重启多了一个 Bootstrap()
操作,用于初始化一个节点。
if oldwal || rc.join {
rc.node = raft.RestartNode(c)
} else {
rc.node = raft.StartNode(c, rpeers)
}
raft.Node
是 raftNode
的核心,因为它是 raftNode
与 raft 节点通信的唯一途径。这部分相关的代码这里不会讲解。
Raft 启动后需要一种方法来与集群中的 peers 进行通信,raftexample 中选择使用 etcdserver/api/rafthttp
中的 Transport
来处理。
最后,会通过 serveChannels()
方法监听 Raft 相关的活动并进行处理
// 监听停止事件
go rc.serveRaft()
// 监听 raft 相关的 channel,接收并处理
go rc.serveChannels()
func (rc *raftNode) serveChannels() {
snap, err := rc.raftStorage.Snapshot()
if err != nil {
panic(err)
}
rc.confState = snap.Metadata.ConfState
rc.snapshotIndex = snap.Metadata.Index
rc.appliedIndex = snap.Metadata.Index
defer rc.wal.Close()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
// send proposals over raft
// 处理上层 store 的请求
go func() {}()
// event loop on raft state machine updates
// 处理下层 Raft 的通知
for {}
}
serveChannels()
方法有点长,这里分为 处理上层 store server 请求和处理下层 Raft 通知两部分来讲解
// send proposals over raft
// 监听 proposal与 confChange 并处理
go func() {
confChangeCount := uint64(0)
for rc.proposeC != nil && rc.confChangeC != nil {
select {
case prop, ok := <-rc.proposeC:
if !ok {
rc.proposeC = nil
} else {
// blocks until accepted by raft state machine
rc.node.Propose(context.TODO(), []byte(prop))
}
case cc, ok := <-rc.confChangeC:
if !ok {
rc.confChangeC = nil
} else {
confChangeCount++
cc.ID = confChangeCount
rc.node.ProposeConfChange(context.TODO(), cc)
}
}
}
// client closed channel; shutdown raft if not already
close(rc.stopc)
}()
上层的请求分为两种类型,一种是普通的 Propose 请求,一种是配置变更请求,对应于 REST Server 一节提到的 PUT
请求 和 POST/DELETE
请求。
对于 Propose 请求,会将 propose 消息通过 raft.node.Propose()
方法传递给下层的 Raft 集群处理。而对于配置变更请求,会通过 raft.node.ProposeConfChange()
方法传递给 Raft 处理。这部分的主要逻辑都与 Raft 的具体实现相关,这里不会进行讲解。
对于 Raft 传递上来的通知,其处理方法也是我们使用 etcd raft 模块时需要注意的部分
// event loop on raft state machine updates
for {
select {
case <-ticker.C:
rc.node.Tick()
// store raft entries to wal, then publish over commit channel
case rd := <-rc.node.Ready():
// ......
case err := <-rc.transport.ErrorC:
rc.writeError(err)
return
case <-rc.stopc:
rc.stop()
return
}
}
select
中有四种情况,一种是计时器超时,调用 raft.node.Tick()
。这个方法很重要,它相当于 Raft 的时钟,驱动着 Raft 运行。第二种情况是错误处理,第三种情况是停止时间,最后一种情况是处理 Raft 传递到上层的消息。
etcd raft 文档中提到了该如何处理 Raft 传递回来消息:
- 使用
Node.Ready()
获取一个Ready
channel 并从中读取数据 - 按顺序持久化 entries、
HardState
和Snapshot
- 将所有消息发送给指定的对象(
Message.To
) - Apply Snapshot
- 调用
Node.Advance
通知发送下一批更新
case rd := <-rc.node.Ready():
rc.wal.Save(rd.HardState, rd.Entries)
if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
rc.raftStorage.ApplySnapshot(rd.Snapshot)
rc.publishSnapshot(rd.Snapshot)
}
rc.raftStorage.Append(rd.Entries)
rc.transport.Send(rd.Messages)
// 将 committed entries 写入 commit channel,并 apply
applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
if !ok {
rc.stop()
return
}
rc.maybeTriggerSnapshot(applyDoneC)
rc.node.Advance()
Ready
中包含了需要被读取和保存的 entries 和 messages
// Ready encapsulates the entries and messages that are ready to read,
// be saved to stable storage, committed or sent to other peers.
// All fields in Ready are read-only.
type Ready struct {
// The current volatile state of a Node.
// SoftState will be nil if there is no update.
// It is not required to consume or store SoftState.
*SoftState
// The current state of a Node to be saved to stable storage BEFORE
// Messages are sent.
// HardState will be equal to empty state if there is no update.
pb.HardState
// ReadStates can be used for node to serve linearizable read requests locally
// when its applied index is greater than the index in ReadState.
// Note that the readState will be returned when raft receives msgReadIndex.
// The returned is only valid for the request that requested to read.
ReadStates []ReadState
// Entries specifies entries to be saved to stable storage BEFORE
// Messages are sent.
Entries []pb.Entry
// Snapshot specifies the snapshot to be saved to stable storage.
Snapshot pb.Snapshot
// CommittedEntries specifies entries to be committed to a
// store/state-machine. These have previously been committed to stable
// store.
CommittedEntries []pb.Entry
// Messages specifies outbound messages to be sent AFTER Entries are
// committed to stable storage.
// If it contains a MsgSnap message, the application MUST report back to raft
// when the snapshot has been received or has failed by calling ReportSnapshot.
Messages []pb.Message
// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk or if an asynchronous write is permissible.
MustSync bool
}
*SoftStete
表示当前节点的软状态,即不需要持久化的状态,包括 leader id 和 节点的 rolepb.HardState
表示当前节点的硬状态,即必须持久化的状态,包括 term、vote、commitReadStates
一个ReadState
数组切片,用于实现线性一致性,raftexample 中没有相关的处理逻辑Entries
,entry 数组,表示需要被保存的 entriesSnapshot
表示需要被保存的快照CommittedEntries
表示要被提交到状态机中的 etnriesMessages
表示要发送出去的消息MustSync
表示是否需要将HardState
和Enties
同步刷盘
结合文档中提到的处理消息的过程,可以发现这刚好就是处理 Ready
中各个字段的过程。所以可以将处理 Raft 消息总结为两个阶段:处理 Ready
阶段和调用 Advance
通知阶段。下面我们来看一下 raftexample 中是如何处理 Ready
的。
首先,通过 raftNode.wal.Save()
将 HardState
和 Entries
写入到 wal 中,这是为了在结点 crash 后也能够从中回复过来。
然后判断快照是否为空,如果快照不为空,就需要保存并应用快照,然后通过 commit channel 通知上层的 store server
将 HardState、Entries 和 Snapshot 写入稳定存储后就可以将 entries 加入到 Storage
中了。Storage
是一个接口,定义了一个存储层应该实现的功能,raftexample 使用的是 MemoryStorage
,这就意味着断电后其中的数据会全部消失。
接着会通过通信协议将 Ready
中的 messages 发送给指定的对象,因为他们也需要处理这些消息。然后就可以将已提交的 entries 通过 commit channel 发送给上层的 store 了。需要注意的是这里的 CommittedEntries
可能与 Storage
中已有的重复,这时就需要通过 raftNode.appliedIndex
进行判断了