Run

启动前先构建 raftexample

cd <directory>/src/go.etcd.io/etcd/contrib/raftexample
go build -o raftexample

启动一个三节点集群

raftexample --id 1 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 12380 

raftexample --id 2 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 22380

raftexample --id 3 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 32380

每个 raftexample 程序都包含一个 Raft 实例和一个 k/v 服务,多个 peer 可以使用逗号分隔。

# 测试 K/V 的存储功能
curl -L http://127.0.0.1:12380/my-key -XPUT -d hello

curl -L http://127.0.0.1:12380/my-key

Design

raftexample 由三个组件组成

  • K/V 存储, 存储所有已提交的键值对,充当 Raft Servers 和 REST server 之间通信的桥梁
  • REST API server,REST server 通过访问 K/V store 来与 Raft servers 交流
  • Raft Servers,当 Raft servers 达成共识后,可以接收 REST server 提交的 proposal 并传递给 peers。当 raft 达成共识后会将提交更新给 K/V store

(图源叉鸽)

REST API

raftexample 通过 serverHttpKVAPI 启动 REST server,具体启动服务的过程是对 http 库的简单的使用,不过多展示。我们只需要知道在启动的时候讲 httpKVAPI 注册给了 Handler,并在请求到达时会调用 httpKVAPI.ServeHTTP() 方法即可

func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	//... ...
	switch {
	case r.Method == "PUT":
		// ... ...
		h.store.Propose(key, string(v))    
		// ... ...
	case r.Method == "GET":
		if v, ok := h.store.Lookup(key); ok {
			// ... ...
		}
		// ... ...
	case r.Method == "POST":
		url, err := ioutil.ReadAll(r.Body)
		// ... ...
		cc := raftpb.ConfChange{
			Type:    raftpb.ConfChangeAddNode,
			NodeID:  nodeId,
			Context: url,
		}
		h.confChangeC <- cc
		// ... ...
	case r.Method == "DELETE":
		// ... ...
		cc := raftpb.ConfChange{
			Type:   raftpb.ConfChangeRemoveNode,
			NodeID: nodeId,
		}
		h.confChangeC <- cc
		// ... ...
	default:
		// ... ...
	}
}
  • GET 请求,调用 store 的 Lookup() 方法查找
  • PUT 请求,调用 store 的 Propose() 提议一个 KV 数据
  • POST/PUT 是配置变更请求,将请求通过 channel 传递给 raft 层处理

KV Store

KV store 是连接 raft servers 与 REST server 的桥梁,通过 kvstore 结构表示

// a key-value store backed by raft
type kvstore struct {
	proposeC    chan<- string // channel for proposing updates
	mu          sync.RWMutex
	kvStore     map[string]string // current committed key-value pairs
	snapshotter *snap.Snapshotter
}
  • proposeCkvstore 通过这个 channel 将 propose 信息发送到 raft,raft 将对此进行 propose
  • kvStore: 存储已提交数据的 map
  • snapshotter:启动时可以通过快照恢复到原来的状态,具体过程在 loadSnapshot()recoverFromSnapshot 方法中,这里不过多介绍

当从快照中恢复后,kvstore 会通过调用 readCommits() 方法监听 raft 层传递的提交信息

func (s *kvstore) readCommits(commitC <-chan *commit, errorC <-chan error) {
	for commit := range commitC {
		if commit == nil {
			// signaled to load snapshot
			snapshot, err := s.loadSnapshot()
			if snapshot != nil {
				if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
					log.Panic(err)
				}
			}
			continue
		}

		for _, data := range commit.data {
			var dataKv kv
			dec := gob.NewDecoder(bytes.NewBufferString(data))
			if err := dec.Decode(&dataKv); err != nil {
				log.Fatalf("raftexample: could not decode message (%v)", err)
			}
			s.mu.Lock()
			s.kvStore[dataKv.Key] = dataKv.Val
			s.mu.Unlock()
		}
		close(commit.applyDoneC)
	}
	if err, ok := <-errorC; ok {
		log.Fatal(err)
	}
}
  • commitC,raft 层返回的 channel,当 raft 层就一个命令达成共识后,会向这个 channel 中发送信号
  • errorC,raft 层返回的 channel

raftexample 通过 nil 表明 Raft 层生成了快照,所以当 commitC 接收到 nil 值时需要检查快照是否存在,如果存在就必须进行加载。

当通过 commitC 接收到 raft 传递过来的提交的信号时,kvstore 就知道该值已经达成了共识,于是就可以将该 kv 对存储下来了

前面我们提到过,当 REST server 收到 GET 请求时会调用 store 的 Lookup() 方法查找数据,这个过程很简单,直接查找 kvstore 中存储的数据即可,不需要访问 raft 层。当 REST server 收到 PUT 请求时会调用 Propose() 方法提议一个命令,其实现如下:

func (s *kvstore) Propose(k string, v string) {
	var buf bytes.Buffer
	if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
		log.Fatal(err)
	}
	s.proposeC <- buf.String()
}

可以看到,store 层通过 proposeC 这个 channel 将封装好的 K/V 传递给了 Raft 层,所以接下来的 Raft 层才是真正的核心

Raft Node

这一部分是 raftexample 的核心,因为他告诉了我们该如何使用 etcd 的 Raft 模块。

Raft 节点通过 raftNode 表示,其结构如下所示:

// A key-value stream backed by raft
type raftNode struct {
	proposeC    <-chan string            // proposed messages (k,v)
	confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
	commitC     chan<- *commit           // entries committed to log (k,v)
	errorC      chan<- error             // errors from raft session

	id          int      // client ID for raft session
	peers       []string // raft peer URLs
	join        bool     // node is joining an existing cluster
	waldir      string   // path to WAL directory
	snapdir     string   // path to snapshot directory
	getSnapshot func() ([]byte, error)

	confState     raftpb.ConfState
	snapshotIndex uint64
	appliedIndex  uint64

	// raft backing for the commit/error channel
	node        raft.Node
	raftStorage *raft.MemoryStorage
	wal         *wal.WAL

	snapshotter      *snap.Snapshotter
	snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready

	snapCount uint64
	transport *rafthttp.Transport
	stopc     chan struct{} // signals proposal channel closed
	httpstopc chan struct{} // signals http server to shutdown
	httpdonec chan struct{} // signals http server shutdown complete

	logger *zap.Logger
}
  • proposeC <-chan string 用于接收普通的 propose 通知
  • confChangeC <- chan ConfChange 用于接收配置变更通知
  • commitC chan<- *commit,当 Raft 就某个命令达成共识时,通过 commitC 通知上层 store 该命令可以 apply 了
  • errorC chan<- error,用于将错误报告给上层组件
  • id,该 raft node 的 id
  • peers,集群中的其他 peer
  • raftStorage,Raft 的存储状态,即 RSM。Raft 提供了 etcd/raft/storage.go 接口,用于定义操作 RSM 的方法
  • node Raft 节点
  • ......

raftNode 的详细启动过程可以在 newRaftNode 中查看,这里只大概梳理一下流程。raft 启动前,先会通过 replayWAL() 方法尝试重放 WAL 日志,重放完成后会通知上层 store 加载快照。恢复完成状态后就会构建与集群相关的信息

	rpeers := make([]raft.Peer, len(rc.peers))
	for i := range rpeers {
		rpeers[i] = raft.Peer{ID: uint64(i + 1)}
	}
	c := &raft.Config{
		ID:                        uint64(rc.id),
		ElectionTick:              10,
		HeartbeatTick:             1,
		Storage:                   rc.raftStorage,
		MaxSizePerMsg:             1024 * 1024,
		MaxInflightMsgs:           256,
		MaxUncommittedEntriesSize: 1 << 30,
	}

随后便会根据是否有 wal 来决定应该重启还是启动一个新节点。启动一个新节点相比重启多了一个 Bootstrap() 操作,用于初始化一个节点。

	if oldwal || rc.join {
		rc.node = raft.RestartNode(c)
	} else {
		rc.node = raft.StartNode(c, rpeers)
	}

raft.NoderaftNode 的核心,因为它是 raftNode 与 raft 节点通信的唯一途径。这部分相关的代码这里不会讲解。

Raft 启动后需要一种方法来与集群中的 peers 进行通信,raftexample 中选择使用 etcdserver/api/rafthttp 中的 Transport 来处理。

最后,会通过 serveChannels() 方法监听 Raft 相关的活动并进行处理

	// 监听停止事件
	go rc.serveRaft()
	// 监听  raft 相关的 channel,接收并处理
	go rc.serveChannels()

func (rc *raftNode) serveChannels() {
	snap, err := rc.raftStorage.Snapshot()
	if err != nil {
		panic(err)
	}
	rc.confState = snap.Metadata.ConfState
	rc.snapshotIndex = snap.Metadata.Index
	rc.appliedIndex = snap.Metadata.Index

	defer rc.wal.Close()

	ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()

	// send proposals over raft
	// 处理上层 store 的请求
	go func() {}()

	// event loop on raft state machine updates
	// 处理下层 Raft 的通知
	for {}
}

serveChannels() 方法有点长,这里分为 处理上层 store server 请求和处理下层 Raft 通知两部分来讲解

// send proposals over raft
// 监听 proposal与 confChange 并处理
go func() {
	confChangeCount := uint64(0)

	for rc.proposeC != nil && rc.confChangeC != nil {
		select {
		case prop, ok := <-rc.proposeC:
			if !ok {
				rc.proposeC = nil
			} else {
				// blocks until accepted by raft state machine
				rc.node.Propose(context.TODO(), []byte(prop))
			}

		case cc, ok := <-rc.confChangeC:
			if !ok {
				rc.confChangeC = nil
			} else {
				confChangeCount++
				cc.ID = confChangeCount
				rc.node.ProposeConfChange(context.TODO(), cc)
			}
		}
	}
	// client closed channel; shutdown raft if not already
	close(rc.stopc)
}()

上层的请求分为两种类型,一种是普通的 Propose 请求,一种是配置变更请求,对应于 REST Server 一节提到的 PUT 请求 和 POST/DELETE 请求。

对于 Propose 请求,会将 propose 消息通过 raft.node.Propose() 方法传递给下层的 Raft 集群处理。而对于配置变更请求,会通过 raft.node.ProposeConfChange() 方法传递给 Raft 处理。这部分的主要逻辑都与 Raft 的具体实现相关,这里不会进行讲解。

对于 Raft 传递上来的通知,其处理方法也是我们使用 etcd raft 模块时需要注意的部分

// event loop on raft state machine updates
for {
	select {
	case <-ticker.C:
		rc.node.Tick()

	// store raft entries to wal, then publish over commit channel
	case rd := <-rc.node.Ready():
		// ......
	case err := <-rc.transport.ErrorC:
		rc.writeError(err)
		return
	case <-rc.stopc:
		rc.stop()
		return
	}
}

select 中有四种情况,一种是计时器超时,调用 raft.node.Tick()。这个方法很重要,它相当于 Raft 的时钟,驱动着 Raft 运行。第二种情况是错误处理,第三种情况是停止时间,最后一种情况是处理 Raft 传递到上层的消息。

etcd raft 文档中提到了该如何处理 Raft 传递回来消息:

  1. 使用 Node.Ready() 获取一个 Ready channel 并从中读取数据
  2. 按顺序持久化 entries、HardStateSnapshot
  3. 将所有消息发送给指定的对象(Message.To)
  4. Apply Snapshot
  5. 调用 Node.Advance 通知发送下一批更新
	case rd := <-rc.node.Ready():
		rc.wal.Save(rd.HardState, rd.Entries)
		if !raft.IsEmptySnap(rd.Snapshot) {
			rc.saveSnap(rd.Snapshot)
			rc.raftStorage.ApplySnapshot(rd.Snapshot)
			rc.publishSnapshot(rd.Snapshot)
		}
		rc.raftStorage.Append(rd.Entries)
		rc.transport.Send(rd.Messages)
		// 将 committed entries 写入 commit channel,并 apply
		applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
		if !ok {
			rc.stop()
			return
		}
		rc.maybeTriggerSnapshot(applyDoneC)
		rc.node.Advance()

Ready 中包含了需要被读取和保存的 entries 和 messages

// Ready encapsulates the entries and messages that are ready to read,
// be saved to stable storage, committed or sent to other peers.
// All fields in Ready are read-only.
type Ready struct {
	// The current volatile state of a Node.
	// SoftState will be nil if there is no update.
	// It is not required to consume or store SoftState.
	*SoftState

	// The current state of a Node to be saved to stable storage BEFORE
	// Messages are sent.
	// HardState will be equal to empty state if there is no update.
	pb.HardState

	// ReadStates can be used for node to serve linearizable read requests locally
	// when its applied index is greater than the index in ReadState.
	// Note that the readState will be returned when raft receives msgReadIndex.
	// The returned is only valid for the request that requested to read.
	ReadStates []ReadState

	// Entries specifies entries to be saved to stable storage BEFORE
	// Messages are sent.
	Entries []pb.Entry

	// Snapshot specifies the snapshot to be saved to stable storage.
	Snapshot pb.Snapshot

	// CommittedEntries specifies entries to be committed to a
	// store/state-machine. These have previously been committed to stable
	// store.
	CommittedEntries []pb.Entry

	// Messages specifies outbound messages to be sent AFTER Entries are
	// committed to stable storage.
	// If it contains a MsgSnap message, the application MUST report back to raft
	// when the snapshot has been received or has failed by calling ReportSnapshot.
	Messages []pb.Message

	// MustSync indicates whether the HardState and Entries must be synchronously
	// written to disk or if an asynchronous write is permissible.
	MustSync bool
}
  • *SoftStete 表示当前节点的软状态,即不需要持久化的状态,包括 leader id 和 节点的 role
  • pb.HardState 表示当前节点的硬状态,即必须持久化的状态,包括 term、vote、commit
  • ReadStates 一个 ReadState 数组切片,用于实现线性一致性,raftexample 中没有相关的处理逻辑
  • Entries,entry 数组,表示需要被保存的 entries
  • Snapshot 表示需要被保存的快照
  • CommittedEntries 表示要被提交到状态机中的 etnries
  • Messages 表示要发送出去的消息
  • MustSync 表示是否需要将 HardStateEnties 同步刷盘

结合文档中提到的处理消息的过程,可以发现这刚好就是处理 Ready 中各个字段的过程。所以可以将处理 Raft 消息总结为两个阶段:处理 Ready 阶段和调用 Advance 通知阶段。下面我们来看一下 raftexample 中是如何处理 Ready 的。

首先,通过 raftNode.wal.Save()HardStateEntries 写入到 wal 中,这是为了在结点 crash 后也能够从中回复过来。

然后判断快照是否为空,如果快照不为空,就需要保存并应用快照,然后通过 commit channel 通知上层的 store server

将 HardState、Entries 和 Snapshot 写入稳定存储后就可以将 entries 加入到 Storage 中了。Storage 是一个接口,定义了一个存储层应该实现的功能,raftexample 使用的是 MemoryStorage,这就意味着断电后其中的数据会全部消失。

接着会通过通信协议将 Ready 中的 messages 发送给指定的对象,因为他们也需要处理这些消息。然后就可以将已提交的 entries 通过 commit channel 发送给上层的 store 了。需要注意的是这里的 CommittedEntries 可能与 Storage 中已有的重复,这时就需要通过 raftNode.appliedIndex 进行判断了