首先来回顾一下选举的大概流程:

  1. Follower 长时间没有收到心跳,成为 Candidate,发起一轮投票
  2. Candidate 如果接收到多数的投票就会成为 Leader
  3. 如果 Leader 发现有其他的 Leader 并且 term 比自己高,就会变为 Follower

这是 Raft 论文中提到的大致流程。但是有时因为网络隔离等问题,Follower 会认为 Leader 已经 crash,于是会发起投票。但是投票无法到达大多数节点,于是这个 Follower 会不断地发起投票请求,但是又不断失败,最终会导致 term 会很大。当网络恢复正常时,之前被隔离的 Follower 会发起新的选举,会导致现有 Leader 变为 Follower,但由于 Log 落后,导致集群中所有的节点都无法当选,直到补齐 term 差距。这就会造成在相当长的一段时间内系统的不可用

在 Raft 作者的博士论文中给出了一种解决方案:引入一个预投票状态(PreCandidate)。当 Follower 超时时,进入预选举状态,这种状态下 term 不会增加,直到收到大多数投票同意后,才进入选举状态,重新发送投票请求。

引入预选举阶段的状态机如下所示:

graph LR
    *((*)) --start up --> Follower((Follower))
    Follower -- timeout, 
                start election --> PreCandidate((PreCandidate))
    PreCandidate -- timeout,
                 new election --> PreCandidate
    PreCandidate -- receives votes from
                 majority of servers --> Candidate((Candidate))
    Candidate -- receives votes from
                    majority of servers--> Leader((Leader))
    PreCandidate -- discovers current
	            leader or new term  --> Follower
	Candidate -- discovers current
	            leader or new term --> Follower
    Leader -- discovers server
              with higher term --> Follower

在 etcd 中,预选举阶段和选举阶段使用相同的 Candidate 状态表示。

raft 结构很长,这里只列出选举相关的属性

type raft struct {
	id uint64
	Term uint64
	Vote uint64

	readStates []ReadState

	// the log
	raftLog *raftLog
	state StateType

	// isLearner is true if the local raft node is a learner.
	isLearner bool

	msgs []pb.Message

	// the leader id
	lead uint64
	// leadTransferee is id of the leader transfer target when its value is not zero.
	// Follow the procedure defined in raft thesis 3.10.
	leadTransferee uint64

	// number of ticks since it reached last electionTimeout when it is leader
	// or candidate.
	// number of ticks since it reached last electionTimeout or received a
	// valid message from current leader when it is a follower.
	electionElapsed int

	// number of ticks since it reached last heartbeatTimeout.
	// only leader keeps heartbeatElapsed.
	heartbeatElapsed int

	checkQuorum bool
	preVote     bool

	heartbeatTimeout int
	electionTimeout  int
	// randomizedElectionTimeout is a random number between
	// [electiontimeout, 2 * electiontimeout - 1]. It gets reset
	// when raft changes its state to follower or candidate.
	randomizedElectionTimeout int

	tick func()
	step stepFunc
}

raft 结构中有一个 step stepFunc 属性,他是一个函数,接收 raftMessage。每当需要改变状态时,都会给这个 step 成员赋值相应的方法。step 的可能取值有下面几种:

  • stepLeader,当 Raft 节点成为 Leader 时会为其赋值
  • stepFollower,当 Raft 节点变为 Follower 时会为其赋值
  • stepCandidate,当 Raft 节点变为 Candidate 时会为其赋值

step 类似,tick 也有会有不同的取值:

  • tickElection,Follower/Candidate 会调用该方法发起选举
  • tickHeartbeat,Leader 会调用该方法发送心跳

由于与时钟相关的逻辑是由用户实现的,所以用户必须调用 Node.Tick() 方法来驱动内部的运行,该方法最终会调用 raft.tick 来调用 tickElection 或者 tickHeartbeat

超时选举

// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
	r.electionElapsed++

	if r.promotable() && r.pastElectionTimeout() {
		r.electionElapsed = 0
		// 发送 Hub 消息
		r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
	}
}

当调用 tickElection 时发现已经已经超过一段时间没有接收到心跳包了,就会认为 Leader 已经 crash 了,于是就会调用 Step 方法尝试发起一轮选举。

注意这里 MessageType 字段:

  • MsgHup:不用于节点间通信,仅用于发送给本节点让本节点进行选举
  • MsgBeat: 不用于节点间通信,仅用于 leader 节点在 heartbeat 定时器到期时向集群中其他节点发送心跳消息
  • MsgHeartbeatResp: 用于 follower 向 leader 应答心跳消息
  • MsgVote/MsgPreVote: 节点投票消息,要求其他节点给自己投票
  • MsgVoteResp/MsgPreVoteResp:用于节点相应投票消息
  • ......

关于 Step() 方法,这里只给出与发送选举相关的代码,其他部分会在后面讲解相应功能时给出

func (r *raft) Step(m pb.Message) error {
	// Handle the message term, which may result in our stepping down to a follower.
	switch {
	case m.Term == 0:
		// local message 本地:选举消息 啥都不做
	case m.Term > r.Term:
		// ......
	case m.Term < r.Term:
		// ......
	}

	switch m.Type {
	case pb.MsgHup:
		if r.preVote {
			// 如果开启了预选举,发送预选举消息
			r.hup(campaignPreElection)
		} else {
			// 否则开始选举
			r.hup(campaignElection)
		}

	case pb.MsgVote, pb.MsgPreVote:
		// ......
	default:
		err := r.step(r, m)
	}
	return nil
}

我们注意到,在 tickElection() 方法中我们发送消息时并没有为消息的 Term 赋值,在 Go 中,如果没有赋值,uint64 中的默认值为 0,这表示是一个本地消息。在处理该消息时会根据你是否开启了预选举功能来选择发送预选举消息还是选举消息(etcd 中提供了选项来启用预选举,可以在 Config.PreVote 中选择是否开启)。发送选举消息的相关逻辑在 raft.hub() 中,该方法会调用 raft.campaign 来开启一轮选举

// campaign transitions the raft instance to candidate state. This must only be  
// called after verifying that this is a legitimate transition.  
func (r *raft) campaign(t CampaignType) {  
   // 判断是否能够发起选举,需要满足条件  
   if !r.promotable() {}  
   var term uint64  
   var voteMsg pb.MessageType  
   if t == campaignPreElection {  // 预选举  
      r.becomePreCandidate()  
      voteMsg = pb.MsgPreVote  
      // PreVote RPCs are sent for the next term before we've incremented r.Term.  
      term = r.Term + 1  
   } else {  // 选举: 成为 Candidate; term ++; 为自己投票  
      r.becomeCandidate()  
      voteMsg = pb.MsgVote  
      term = r.Term  
   }  
   // 处理单节点集群的情况
   if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {  }  

   for _, id := range ids {  
      if id == r.id {   // 跳过自己,已经投过票了
         continue  
      }  
      // 发送投票请求  
      r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})  
   }  
}

需要注意的是,如果发起的是预选举投票,虽然候选者本身的 term 不会增加,但是发送的请求中的 term 还是会增加 1 的。

投票响应

投票请求所需要的条件参与与论文中要求的一致:

  1. 候选者的 Term >= 自己的 term
  2. 如果还没有投票或者已经为该候选者投过票了,并且候选者的 log 至少和自己的一样新
func (r *raft) Step(m pb.Message) error {
	// Handle the message term, which may result in our stepping down to a follower.
	switch {
	case m.Term == 0:
	case m.Term > r.Term:
		// 消息的 term 比自己当前的 term 还大的情况, PreVote, response
		if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
			// 对应于 campaignTransfer,即 Leader 指定该 Server 成为 Leader 的情况
			force := bytes.Equal(m.Context, []byte(campaignTransfer))
			inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
			if !force && inLease {
				return nil
			}
		}
		switch {
		// 预选举以及预选举响应(没有拒绝的),不需要处理
		case m.Type == pb.MsgPreVote:
			// Never change our term in response to a PreVote
		case m.Type == pb.MsgPreVoteResp && !m.Reject:
		default:
			// 其他情况表明其他 Server 有更高的 term,所以自己需要转化为 follower
			if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
				r.becomeFollower(m.Term, m.From)
			} else {
				r.becomeFollower(m.Term, None)
			}
		}

	case m.Term < r.Term:
		if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
			r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
		} else if m.Type == pb.MsgPreVote {
			r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true})
		} 
		return nil
	}

	switch m.Type {
	case pb.MsgHup:
	case pb.MsgVote, pb.MsgPreVote:
		// We can vote if this is a repeat of a vote we've already cast...
		// 判断是否能投票
		canVote := r.Vote == m.From || // 已经为其投过票了
			(r.Vote == None && r.lead == None) || // 还没投过票
			(m.Type == pb.MsgPreVote && m.Term > r.Term) // 预投票阶段(不会改变 term,所以需要额外判断)
		if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
			r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
			if m.Type == pb.MsgVote {
				// Only record real votes.
				r.electionElapsed = 0
				r.Vote = m.From
			}
		} else {
			// 拒绝投票
				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
			r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
		}

	default:
		err := r.step(r, m)
		if err != nil {
			return err
		}
	}
	return nil
}

可以看到,低于 m.Term < r.Term 的情况下,投票请求会被拒绝。当时如果 m.Term > r.Term,根据论文的描述自己应该将自己的 term 设置为 1,并转变为 Follower。但是 etcd 引入了租约(lease) 机制,如果租约还有效,即便 m.Term > r.Term 也不会进行投票。

Check Quorum: Leader 每隔一段时间主动检查 Follower 是否活跃,如果活跃数量达不到 quorum,则 Leader 会主动下台,成为 Follower

如果按照论文中描述的将读请求也作为一个 log entry 并对齐达成共识的话,会对性能造成严重的影响。但是如果绕过 Log Read 又可能会读取到过期的数据,所以引入这一机制是为了减少读取过期数据的可能性。

需要注意的是,Check Quorum 只能减少可能性,并不能完全消除过期读

租约(Lease) 机制要求 Leader 在选举超时时间过期前给 Follower 发送消息,防止他们发起选举。

维持心跳

当选 Leader 后,为了防止其他节点发起选举,Leader 还需要定期向 Follower 发送心跳信息。心跳的触发也是通过 tick 来触发的,在 becomeLeader 方法中,该属性被设置为了 tickHeartbeat

// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
func (r *raft) tickHeartbeat() {
	r.heartbeatElapsed++
	r.electionElapsed++

	if r.electionElapsed >= r.electionTimeout {
		r.electionElapsed = 0
		if r.checkQuorum {
			r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
		}
		// If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
		if r.state == StateLeader && r.leadTransferee != None {
			r.abortLeaderTransfer()
		}
	}

	if r.state != StateLeader {
		return
	}

	if r.heartbeatElapsed >= r.heartbeatTimeout {
		r.heartbeatElapsed = 0
		r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
	}
}

tickHeartbeattickElection 类似,都是超时会发送相应的消息。不同的是 tickHeartbeat 还会对 Leader transfer 的情况进行处理,并且在 check quorum 开启的情况下还没主动想 Follower 发送 MsgCheckQuorum 类型的消息来检查 Follower 是否还活跃

Leader Transfer: Leader 指定一个 Follower 成为新的 Leader,该 Follower 会立马发起选举,请求成为 Leader。但是如果一段时间内 Follower 还没有成为 Leader,Leader 就会认为 该 Follower 不适合成为 Leader,所以会 abort 该 transfer 请求