etcd-raft Overview
前面提到过,用户可以通过 StartNode
或者 RestartNode
启动或者重启一个 Raft 节点,StartNode
方法如下所示:
// StartNode returns a new Node given configuration and a list of raft peers.
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
//
// Peers must not be zero length; call RestartNode in that case.
func StartNode(c *Config, peers []Peer) Node {
if len(peers) == 0 {
panic("no peers given; use RestartNode instead")
}
rn, err := NewRawNode(c)
if err != nil {
panic(err)
}
rn.Bootstrap(peers)
n := newNode(rn)
go n.run()
return &n
}
其中 NewRawNode
返回一个 RawNode
类型的节点,表示 raft 相关结构,并且该结构会被封装到 node
结构中并供上层使用者操作(比如 Propose 一个新命令或者新配置、Tick 时钟,接收 Ready
, Advance
)。node
实现了 Node
接口,实现其定义的相关方法
Node
// Node represents a node in a raft cluster.
type Node interface {
// Tick increments the internal logical clock for the Node by a single tick. Election
// timeouts and heartbeat timeouts are in units of ticks.
Tick()
// Campaign causes the Node to transition to candidate state and start campaigning to become leader.
Campaign(ctx context.Context) error
// Propose proposes that data be appended to the log. Note that proposals can be lost without
// notice, therefore it is user's job to ensure proposal retries.
Propose(ctx context.Context, data []byte) error
// ProposeConfChange proposes a configuration change. Like any proposal, the
// configuration change may be dropped with or without an error being
// returned. In particular, configuration changes are dropped unless the
// leader has certainty that there is no prior unapplied configuration
// change in its log.
//
// The method accepts either a pb.ConfChange (deprecated) or pb.ConfChangeV2
// message. The latter allows arbitrary configuration changes via joint
// consensus, notably including replacing a voter. Passing a ConfChangeV2
// message is only allowed if all Nodes participating in the cluster run a
// version of this library aware of the V2 API. See pb.ConfChangeV2 for
// usage details and semantics.
ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error
// Step advances the state machine using the given message. ctx.Err() will be returned, if any.
Step(ctx context.Context, msg pb.Message) error
// Ready returns a channel that returns the current point-in-time state.
// Users of the Node must call Advance after retrieving the state returned by Ready.
//
// NOTE: No committed entries from the next Ready may be applied until all committed entries
// and snapshots from the previous one have finished.
Ready() <-chan Ready
// Advance notifies the Node that the application has saved progress up to the last Ready.
// It prepares the node to return the next available Ready.
//
// The application should generally call Advance after it applies the entries in last Ready.
//
// However, as an optimization, the application may call Advance while it is applying the
// commands. For example. when the last Ready contains a snapshot, the application might take
// a long time to apply the snapshot data. To continue receiving Ready without blocking raft
// progress, it can call Advance before finishing applying the last ready.
Advance()
// ApplyConfChange applies a config change (previously passed to
// ProposeConfChange) to the node. This must be called whenever a config
// change is observed in Ready.CommittedEntries, except when the app decides
// to reject the configuration change (i.e. treats it as a noop instead), in
// which case it must not be called.
//
// Returns an opaque non-nil ConfState protobuf which must be recorded in
// snapshots.
ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState
// TransferLeadership attempts to transfer leadership to the given transferee.
TransferLeadership(ctx context.Context, lead, transferee uint64)
// ReadIndex request a read state. The read state will be set in the ready.
// Read state has a read index. Once the application advances further than the read
// index, any linearizable read requests issued before the read request can be
// processed safely. The read state will have the same rctx attached.
// Note that request can be lost without notice, therefore it is user's job
// to ensure read index retries.
ReadIndex(ctx context.Context, rctx []byte) error
// Status returns the current status of the raft state machine.
Status() Status
// ReportUnreachable reports the given node is not reachable for the last send.
ReportUnreachable(id uint64)
// ReportSnapshot reports the status of the sent snapshot. The id is the raft ID of the follower
// who is meant to receive the snapshot, and the status is SnapshotFinish or SnapshotFailure.
// Calling ReportSnapshot with SnapshotFinish is a no-op. But, any failure in applying a
// snapshot (for e.g., while streaming it from leader to follower), should be reported to the
// leader with SnapshotFailure. When leader sends a snapshot to a follower, it pauses any raft
// log probes until the follower can apply the snapshot and advance its state. If the follower
// can't do that, for e.g., due to a crash, it could end up in a limbo, never getting any
// updates from the leader. Therefore, it is crucial that the application ensures that any
// failure in snapshot sending is caught and reported back to the leader; so it can resume raft
// log probing in the follower.
ReportSnapshot(id uint64, status SnapshotStatus)
// Stop performs any necessary termination of the Node.
Stop()
}
Node
中的方法是上层用户能够操作 Raft 的所有方法了,其中大部分方法我们都已经在 raftexample 中见过了:
Tick()
用于充当内部时钟,驱动系统运行(比如选举超时、心跳超时)Propose()
提议一个普通命令,Raft 会在尝试内部达成共识ProposeConfChange
提议一个配置变更命令Ready
上层用户需要通过这个方法来获取当前时刻的状态Advance
用于通知 Raft 可以继续发送下一个Ready
ApplyConfChange
应用配置变更,当Ready
中包含配置变更信息时需要调用该方法 当然也有一些我们还没见过的方法Compagin
: 让节点马上进行选举Step
,让节点进入某种状态,比如StepLeader
TransferLeadership
:用于将 Leader 转让给其他节点。这是 Raft 论文中提到的一点优化ReadIndex
:用于优化性能,也是 Raft 论文中提到的优化方式Stop
终止节点
RawNode
Node
只是定义了这些接口供上层调用,但真正操作 Raft 的是 RawNode
,node
只是将其作为自己的一个成员变量,并对外提供了一组线程的方法而已。
// node is the canonical implementation of the Node interface
type node struct {
propc chan msgWithResult
recvc chan pb.Message
confc chan pb.ConfChangeV2
confstatec chan pb.ConfState
readyc chan Ready
advancec chan struct{}
tickc chan struct{}
done chan struct{}
stop chan struct{}
status chan chan Status
rn *RawNode
}
RawNode
是 node
操作的真正对象,其中的方法都是线程不安全的,因此如果要实现自己的 Node
,还需要考虑线程安全问题
// RawNode is a thread-unsafe Node.
// The methods of this struct correspond to the methods of Node and are described
// more fully there.
type RawNode struct {
raft *raft
prevSoftSt *SoftState
prevHardSt pb.HardState
}
prevSoftSt
中存储的是一些易失性的信息,用于打日志或者调试(比如 leader, 当前状态),不需要存储到稳定存储中prevHardSt
中存储的是必须存储到稳定存储中的信息,比如 term, vote, commit(虽然在 Raft 论文中说不需要持久化,但是为了快速重启也可以持久化)raft
存储了 Raft 内部运行所需要的信息,比如raftLog
,heartbeatTimeout
,electionTimeout
等
其实 HardState
中的所有信息都可以在 raft
中找到,那为什么要单独将他抽取出来呢?我觉得这些信息是上层用户需要保存的信息,所以单独抽取出来比较方便。比如之前提到过用户需要使用 Ready
来获取需要获取并报错当前的状态,其中就包括了 HardState
// Ready encapsulates the entries and messages that are ready to read,
// be saved to stable storage, committed or sent to other peers.
// All fields in Ready are read-only.
type Ready struct {
// The current volatile state of a Node.
// SoftState will be nil if there is no update.
// It is not required to consume or store SoftState.
// 不用写入稳固存储,包括 Leader ID、状态
*SoftState
// The current state of a Node to be saved to stable storage BEFORE
// Messages are sent.
// HardState will be equal to empty state if there is no update.
// 将会写入到稳固存储中,包括 Term、Vote、Commit
pb.HardState
// ReadStates can be used for node to serve linearizable read requests locally
// when its applied index is greater than the index in ReadState.
// Note that the readState will be returned when raft receives msgReadIndex.
// The returned is only valid for the request that requested to read.
ReadStates []ReadState
// Entries specifies entries to be saved to stable storage BEFORE
// Messages are sent.
Entries []pb.Entry
// Snapshot specifies the snapshot to be saved to stable storage.
Snapshot pb.Snapshot
// CommittedEntries specifies entries to be committed to a
// store/state-machine. These have previously been committed to stable
// store.
CommittedEntries []pb.Entry
// Messages specifies outbound messages to be sent AFTER Entries are
// committed to stable storage.
// If it contains a MsgSnap message, the application MUST report back to raft
// when the snapshot has been received or has failed by calling ReportSnapshot.
Messages []pb.Message
// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk or if an asynchronous write is permissible.
MustSync bool
}
*SoftStete
表示当前节点的软状态,即不需要持久化的状态pb.HardState
表示当前节点的硬状态,即必须持久化的状态ReadStates
一个ReadState
数组切片,用于实现线性一致性Entries
,entry 数组,表示需要被保存的 entriesSnapshot
表示需要被保存的快照CommittedEntries
表示要被提交到状态机中的 etnriesMessages
表示要发送出去的消息MustSync
表示是否需要将HardState
和Enties
同步刷盘
可以看到这里还有一个 Message
字段,这是 Raft 节点需要发送的消息,这一部分需要用户自己发送,也就是说需要用户自己实现消息传送功能,并将这些消息发送指定的 Raft 节点。