|
Overview
Etcd中的Raft使用了protobuf进行选举轮次间的消息通讯,其中包含的消息类型定义于raft.proto中:
// For description of different message types, see:// https://pkg.go.dev/go.etcd.io/etcd/raft/v3#hdr-MessageTypeenum MessageType { MsgHup = 0; MsgBeat = 1; MsgProp = 2; MsgApp = 3; MsgAppResp = 4; MsgVote = 5; MsgVoteResp = 6; MsgSnap = 7; MsgHeartbeat = 8; MsgHeartbeatResp = 9; MsgUnreachable = 10; MsgSnapStatus = 11; MsgCheckQuorum = 12; MsgTransferLeader = 13; MsgTimeoutNow = 14; MsgReadIndex = 15; MsgReadIndexResp = 16; MsgPreVote = 17; MsgPreVoteResp = 18;}
其核心为Message类,记录当前的任期(Term),信息的发送和接收方(From->To),以及需要传递的Context、Commit、Entry等字段。
type Message struct { Type MessageType To uint64 From uint64 Term uint64 // logTerm is generally used for appending Raft logs to followers. For example, // (type=MsgApp,index=100,logTerm=5) means leader appends entries starting at // index=101, and the term of entry at index 100 is 5. // (type=MsgAppResp,reject=true,index=100,logTerm=5) means follower rejects some // entries from its leader as it already has an entry with term 5 at index 100. LogTerm uint64 Index uint64 Entries []Entry Commit uint64 Snapshot Snapshot Reject bool RejectHint uint64 Context []byte }(1) MsgHup
MsgHup用于开启选举。假如Follower和Candidate没有收到心跳,则开启一轮新的选举。实现于raft.go的tickElection中:
// tickElection is run by followers and candidates after r.electionTimeout.func (r *raft) tickElection() { r.electionElapsed++ // 自己可以被promote & election timeout 超时了,规定时间没有听到心跳发起选举 // 发送MsgHup if r.promotable() && r.pastElectionTimeout() { r.electionElapsed = 0 if err := r.Step(pb.Message{From: r.id, Type: pb.MsgHup}); err != nil { r.logger.Debugf("error occurred during election: %v", err) } }}
在核心方法Step中,接收到MsgHup则开启选举。在这里如果preVote为true,则需要进行第一次选举,否则执行常规选举:
campaignPreElection :第一次选举campaignElection : 常规的基于时间的选举campaginTransfer : 表示转移Leader
switch m.Type {case pb.MsgHup: if r.preVote { r.hup(campaignPreElection) } else { r.hup(campaignElection) }...}
其中hup方法本质就是调用campagin,将raft实例转化为Candidate状态,并发送voteMsg给其他node:
func (r *raft) hup(t CampaignType) { // 主要核对是否是合法的Follower if r.state == StateLeader || !r.promotable(){ return } ... } // 核对完成,开始选举 r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term) r.campaign(t)}func (r *raft) campaign(t CampaignType) { ... var term uint64 var voteMsg pb.MessageType // 更改状态 if t == campaignPreElection { r.becomePreCandidate() voteMsg = pb.MsgPreVote // PreVote RPCs are sent for the next term before we've incremented r.Term. term = r.Term + 1 } else { r.becomeCandidate() voteMsg = pb.MsgVote term = r.Term } // 投票,超过半数返回quorum的VoteWon,产生Leader if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon { // 假如只有一个node还赢得了选举,改变到正常选举状态 if t == campaignPreElection { r.campaign(campaignElection) } else { r.becomeLeader() } return } // 获取所有id并排序 var ids []uint64 { idMap := r.prs.Voters.IDs() ids = make([]uint64, 0, len(idMap)) for id := range idMap { ids = append(ids, id) } sort.Slice(ids, func(i, j int) bool { return ids < ids[j] }) } for _, id := range ids { // 跳过自己 if id == r.id { continue } r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term) // 假如要转让Leader,记录上下文 var ctx []byte if t == campaignTransfer { ctx = []byte(t) } // 以voteMsg的形式发送给其他follower当前的信息 r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx}) }}(2) MsgBeat & MsgHeartbeat & MsgHeartbeatResp
MsgBeat 是一个内部类型,在tickHeartbeat(tickElection外的另一个时钟方法)中触发Leader向Follower发送心跳信号MsgHeartbeat
func (r *raft) tickHeartbeat() { r.heartbeatElapsed++ r.electionElapsed++ // electionTimeout : default 10次 // 超过则需要重新计时 if r.electionElapsed >= r.electionTimeout { r.electionElapsed = 0 if r.checkQuorum { if err := r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum}); err != nil { r.logger.Debugf("error occurred during checking sending heartbeat: %v", err) } } // If current leader cannot transfer leadership in electionTimeout, it becomes leader again. if r.state == StateLeader && r.leadTransferee != None { r.abortLeaderTransfer() } } // Only leader sends heartbeat if r.state != StateLeader { return } if r.heartbeatElapsed >= r.heartbeatTimeout { r.heartbeatElapsed = 0 if err := r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}); err != nil { r.logger.Debugf("error occurred during checking sending heartbeat: %v", err) } }}
Leader接收到MsgBeat后开始发送心跳:
switch m.Type {case pb.MsgBeat: r.bcastHeartbeat() return nil...}// bcastHeartbeat 发送RPC,包含之前的上下文func (r *raft) bcastHeartbeat() { lastCtx := r.readOnly.lastPendingRequestCtx() if len(lastCtx) == 0 { r.bcastHeartbeatWithCtx(nil) } else { r.bcastHeartbeatWithCtx([]byte(lastCtx)) }}func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { r.prs.Visit(func(id uint64, _ *tracker.Progress) { if id == r.id { return } r.sendHeartbeat(id, ctx) })}MsgHeartbeat在sendHeartBeat中发送来自Leader的心跳。
func (r *raft) sendHeartbeat(to uint64, ctx []byte) { // 发送当前最小匹配的commit : min(to.matched, r.committed) commit := min(r.prs.Progress[to].Match, r.raftLog.committed) m := pb.Message{ To: to, Type: pb.MsgHeartbeat, Commit: commit, Context: ctx, } r.send(m)}
在核心函数Step中,当收到的msg任期大于当前任期时,在以下情况中需要变为Follower:
msgApp : 日志信息msgHeartbeart :心跳信息msgSnap :快照信息
case m.Term > r.Term: ... if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap { r.becomeFollower(m.Term, m.From) } else { r.becomeFollower(m.Term, None) }}
当 MsgHeartbeat 的任期高于Candidate的任期,Candidate变回Follower;若是Follower则重新计时。
stepCandidate: case pb.MsgHeartbeat: r.becomeFollower(m.Term, m.From) // always m.Term == r.Term r.handleHeartbeat(m)stepFollower: case pb.MsgHeartbeat: r.electionElapsed = 0 r.lead = m.From r.handleHeartbeat(m)MsgHeartbeatResp是MsgHeartbeat的响应,在handleHeartbeat中由Follower(Candidate已经变成了Follower)记录当前commit,随后发送响应。
func (r *raft) handleHeartbeat(m pb.Message) { r.raftLog.commitTo(m.Commit) r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})}
Leader得知是哪个Follower回应了自己的心跳,假如Leader的commitId大于Follower的match index,则发送sendAppend补充日志。
StepLeader:case pb.MsgHeartbeatResp: pr.RecentActive = true pr.ProbeSent = false // 假如Follower尚未接受的MsgApp已经满了,空出一个slot if pr.State == tracker.StateReplicate && pr.Inflights.Full() { pr.Inflights.FreeFirstOne() } // 只有当 leader last committed index 大于 follower Match index, 让Follower追加日志 if pr.Match < r.raftLog.lastIndex() { r.sendAppend(m.From) } if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 { return nil } // 处理只读相关 if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon { return nil } rss := r.readOnly.advance(m) for _, rs := range rss { if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None { r.send(resp) } }(3) MsgProp
MsgProp提出向log entry追加日志的提议,这些提议将被重定向给Leader,视作Leader的本地消息,因此在send中我们不能将发送者的Term追加到待发送的提议中。MsgReadIndex也是同理。
send: ... if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex { m.Term = r.Term }
Follower接收到MsgProp后,发送提议给Leader。假如Candidate收到MsgProp,说明Follower发给Leader的信息被发给了自己,当前没有Leader,则放弃该提议。
stepFollower:case pb.MsgProp: ... m.To = r.lead r.send(m)stepCandidate:case pb.MsgProp: r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) return ErrProposalDropped
Leader处理MsgProp时,首先调用appendEntry将条目追加到自己的Log中,随后调用bcastAppend将这些条目发送给其他节点。
stepLeader:case pb.MsgProp: ... if !r.appendEntry(m.Entries...) { return ErrProposalDropped } r.bcastAppend() return nil(4) MsgApp & MsgAppResp
MsgApp包含要复制的日志entry和当前的Term,在maybeSendAppend中发送该消息。假如无法正确获取Term和Entry,可能当前日志条目已经被snapshot了,因此发送snapshot。否则就发送追加这段entry的消息。
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { ... term, errt := r.raftLog.term(pr.Next - 1) ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize) if len(ents) == 0 && !sendIfEmpty { return false } if errt != nil || erre != nil { // send snapshot if we failed to get term or entries ... } else { m.Type = pb.MsgApp m.Index = pr.Next - 1 m.LogTerm = term m.Entries = ents m.Commit = r.raftLog.committed if n := len(m.Entries); n != 0 { switch pr.State { // optimistically increase the next when in StateReplicate case tracker.StateReplicate: last := m.Entries[n-1].Index pr.OptimisticUpdate(last) pr.Inflights.Add(last) // last index 未知,因此无法更新 case tracker.StateProbe: pr.ProbeSent = true default: r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State) } } } r.send(m) return true}
Follower和Candidate收到MsgApp的处理后,需要追加日志。在此有可能发生日志冲突(Follower和Leader日志不一致),需要根据hintIndex查找当前冲突的index。
func (r *raft) handleAppendEntries(m pb.Message) { // 当前committed已经超过了msg的日志,直接响应 if m.Index < r.raftLog.committed { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) return } // Append后响应 if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) } else { r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x", r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) // 查找可能发生冲突的索引 // 也可以使用lastIndex-1,使用index和lastIndex的最小值更快 hintIndex := min(m.Index, r.raftLog.lastIndex()) hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm) hintTerm, err := r.raftLog.term(hintIndex) if err != nil { panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err)) } r.send(pb.Message{ To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: hintIndex, LogTerm: hintTerm, }) }} |
|