前面提到过,用户可以通过 StartNode 或者 RestartNode 启动或者重启一个 Raft 节点,StartNode 方法如下所示:

// StartNode returns a new Node given configuration and a list of raft peers.
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
//
// Peers must not be zero length; call RestartNode in that case.
func StartNode(c *Config, peers []Peer) Node {
	if len(peers) == 0 {
		panic("no peers given; use RestartNode instead")
	}
	rn, err := NewRawNode(c)
	if err != nil {
		panic(err)
	}
	rn.Bootstrap(peers)

	n := newNode(rn)

	go n.run()
	return &n
}

其中 NewRawNode 返回一个 RawNode 类型的节点,表示 raft 相关结构,并且该结构会被封装到 node 结构中并供上层使用者操作(比如 Propose 一个新命令或者新配置、Tick 时钟,接收 Ready, Advance)。node 实现了 Node 接口,实现其定义的相关方法

Node

// Node represents a node in a raft cluster.
type Node interface {
	// Tick increments the internal logical clock for the Node by a single tick. Election
	// timeouts and heartbeat timeouts are in units of ticks.
	Tick()
	// Campaign causes the Node to transition to candidate state and start campaigning to become leader.
	Campaign(ctx context.Context) error
	// Propose proposes that data be appended to the log. Note that proposals can be lost without
	// notice, therefore it is user's job to ensure proposal retries.
	Propose(ctx context.Context, data []byte) error
	// ProposeConfChange proposes a configuration change. Like any proposal, the
	// configuration change may be dropped with or without an error being
	// returned. In particular, configuration changes are dropped unless the
	// leader has certainty that there is no prior unapplied configuration
	// change in its log.
	//
	// The method accepts either a pb.ConfChange (deprecated) or pb.ConfChangeV2
	// message. The latter allows arbitrary configuration changes via joint
	// consensus, notably including replacing a voter. Passing a ConfChangeV2
	// message is only allowed if all Nodes participating in the cluster run a
	// version of this library aware of the V2 API. See pb.ConfChangeV2 for
	// usage details and semantics.
	ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error

	// Step advances the state machine using the given message. ctx.Err() will be returned, if any.
	Step(ctx context.Context, msg pb.Message) error

	// Ready returns a channel that returns the current point-in-time state.
	// Users of the Node must call Advance after retrieving the state returned by Ready.
	//
	// NOTE: No committed entries from the next Ready may be applied until all committed entries
	// and snapshots from the previous one have finished.
	Ready() <-chan Ready

	// Advance notifies the Node that the application has saved progress up to the last Ready.
	// It prepares the node to return the next available Ready.
	//
	// The application should generally call Advance after it applies the entries in last Ready.
	//
	// However, as an optimization, the application may call Advance while it is applying the
	// commands. For example. when the last Ready contains a snapshot, the application might take
	// a long time to apply the snapshot data. To continue receiving Ready without blocking raft
	// progress, it can call Advance before finishing applying the last ready.
	Advance()
	// ApplyConfChange applies a config change (previously passed to
	// ProposeConfChange) to the node. This must be called whenever a config
	// change is observed in Ready.CommittedEntries, except when the app decides
	// to reject the configuration change (i.e. treats it as a noop instead), in
	// which case it must not be called.
	//
	// Returns an opaque non-nil ConfState protobuf which must be recorded in
	// snapshots.
	ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState

	// TransferLeadership attempts to transfer leadership to the given transferee.
	TransferLeadership(ctx context.Context, lead, transferee uint64)

	// ReadIndex request a read state. The read state will be set in the ready.
	// Read state has a read index. Once the application advances further than the read
	// index, any linearizable read requests issued before the read request can be
	// processed safely. The read state will have the same rctx attached.
	// Note that request can be lost without notice, therefore it is user's job
	// to ensure read index retries.
	ReadIndex(ctx context.Context, rctx []byte) error

	// Status returns the current status of the raft state machine.
	Status() Status
	// ReportUnreachable reports the given node is not reachable for the last send.
	ReportUnreachable(id uint64)
	// ReportSnapshot reports the status of the sent snapshot. The id is the raft ID of the follower
	// who is meant to receive the snapshot, and the status is SnapshotFinish or SnapshotFailure.
	// Calling ReportSnapshot with SnapshotFinish is a no-op. But, any failure in applying a
	// snapshot (for e.g., while streaming it from leader to follower), should be reported to the
	// leader with SnapshotFailure. When leader sends a snapshot to a follower, it pauses any raft
	// log probes until the follower can apply the snapshot and advance its state. If the follower
	// can't do that, for e.g., due to a crash, it could end up in a limbo, never getting any
	// updates from the leader. Therefore, it is crucial that the application ensures that any
	// failure in snapshot sending is caught and reported back to the leader; so it can resume raft
	// log probing in the follower.
	ReportSnapshot(id uint64, status SnapshotStatus)
	// Stop performs any necessary termination of the Node.
	Stop()
}

Node 中的方法是上层用户能够操作 Raft 的所有方法了,其中大部分方法我们都已经在 raftexample 中见过了:

  • Tick() 用于充当内部时钟,驱动系统运行(比如选举超时、心跳超时)
  • Propose() 提议一个普通命令,Raft 会在尝试内部达成共识
  • ProposeConfChange 提议一个配置变更命令
  • Ready 上层用户需要通过这个方法来获取当前时刻的状态
  • Advance 用于通知 Raft 可以继续发送下一个 Ready
  • ApplyConfChange 应用配置变更,当 Ready 中包含配置变更信息时需要调用该方法 当然也有一些我们还没见过的方法
  • Compagin: 让节点马上进行选举
  • Step,让节点进入某种状态,比如 StepLeader
  • TransferLeadership:用于将 Leader 转让给其他节点。这是 Raft 论文中提到的一点优化
  • ReadIndex:用于优化性能,也是 Raft 论文中提到的优化方式
  • Stop 终止节点

RawNode

Node 只是定义了这些接口供上层调用,但真正操作 Raft 的是 RawNodenode 只是将其作为自己的一个成员变量,并对外提供了一组线程的方法而已。

// node is the canonical implementation of the Node interface
type node struct {
	propc      chan msgWithResult
	recvc      chan pb.Message
	confc      chan pb.ConfChangeV2
	confstatec chan pb.ConfState
	readyc     chan Ready
	advancec   chan struct{}
	tickc      chan struct{}
	done       chan struct{}
	stop       chan struct{}
	status     chan chan Status

	rn *RawNode
}

RawNodenode 操作的真正对象,其中的方法都是线程不安全的,因此如果要实现自己的 Node,还需要考虑线程安全问题

// RawNode is a thread-unsafe Node.
// The methods of this struct correspond to the methods of Node and are described
// more fully there.
type RawNode struct {
	raft       *raft
	prevSoftSt *SoftState
	prevHardSt pb.HardState
}
  • prevSoftSt 中存储的是一些易失性的信息,用于打日志或者调试(比如 leader, 当前状态),不需要存储到稳定存储中
  • prevHardSt 中存储的是必须存储到稳定存储中的信息,比如 term, vote, commit(虽然在 Raft 论文中说不需要持久化,但是为了快速重启也可以持久化)
  • raft 存储了 Raft 内部运行所需要的信息,比如 raftLog, heartbeatTimeout, electionTimeout

其实 HardState 中的所有信息都可以在 raft 中找到,那为什么要单独将他抽取出来呢?我觉得这些信息是上层用户需要保存的信息,所以单独抽取出来比较方便。比如之前提到过用户需要使用 Ready 来获取需要获取并报错当前的状态,其中就包括了 HardState

// Ready encapsulates the entries and messages that are ready to read,
// be saved to stable storage, committed or sent to other peers.
// All fields in Ready are read-only.
type Ready struct {
	// The current volatile state of a Node.
	// SoftState will be nil if there is no update.
	// It is not required to consume or store SoftState.
	// 不用写入稳固存储,包括 Leader ID、状态
	*SoftState

	// The current state of a Node to be saved to stable storage BEFORE
	// Messages are sent.
	// HardState will be equal to empty state if there is no update.
	// 将会写入到稳固存储中,包括 Term、Vote、Commit
	pb.HardState

	// ReadStates can be used for node to serve linearizable read requests locally
	// when its applied index is greater than the index in ReadState.
	// Note that the readState will be returned when raft receives msgReadIndex.
	// The returned is only valid for the request that requested to read.
	ReadStates []ReadState

	// Entries specifies entries to be saved to stable storage BEFORE
	// Messages are sent.
	Entries []pb.Entry

	// Snapshot specifies the snapshot to be saved to stable storage.
	Snapshot pb.Snapshot

	// CommittedEntries specifies entries to be committed to a
	// store/state-machine. These have previously been committed to stable
	// store.
	CommittedEntries []pb.Entry

	// Messages specifies outbound messages to be sent AFTER Entries are
	// committed to stable storage.
	// If it contains a MsgSnap message, the application MUST report back to raft
	// when the snapshot has been received or has failed by calling ReportSnapshot.
	Messages []pb.Message

	// MustSync indicates whether the HardState and Entries must be synchronously
	// written to disk or if an asynchronous write is permissible.
	MustSync bool
}
  • *SoftStete 表示当前节点的软状态,即不需要持久化的状态
  • pb.HardState 表示当前节点的硬状态,即必须持久化的状态
  • ReadStates 一个 ReadState 数组切片,用于实现线性一致性
  • Entries,entry 数组,表示需要被保存的 entries
  • Snapshot 表示需要被保存的快照
  • CommittedEntries 表示要被提交到状态机中的 etnries
  • Messages 表示要发送出去的消息
  • MustSync 表示是否需要将 HardStateEnties 同步刷盘

可以看到这里还有一个 Message 字段,这是 Raft 节点需要发送的消息,这一部分需要用户自己发送,也就是说需要用户自己实现消息传送功能,并将这些消息发送指定的 Raft 节点。