找回密码
 立即注册
查看: 284|回复: 0

Etcd Raft (二) 消息类型整理

[复制链接]
发表于 2021-12-25 07:01 | 显示全部楼层 |阅读模式
Overview


Etcd中的Raft使用了protobuf进行选举轮次间的消息通讯,其中包含的消息类型定义于raft.proto中:
// For description of different message types, see:// https://pkg.go.dev/go.etcd.io/etcd/raft/v3#hdr-MessageTypeenum MessageType {    MsgHup             = 0;    MsgBeat            = 1;    MsgProp            = 2;    MsgApp             = 3;    MsgAppResp         = 4;    MsgVote            = 5;    MsgVoteResp        = 6;    MsgSnap            = 7;    MsgHeartbeat       = 8;    MsgHeartbeatResp   = 9;    MsgUnreachable     = 10;    MsgSnapStatus      = 11;    MsgCheckQuorum     = 12;    MsgTransferLeader  = 13;    MsgTimeoutNow      = 14;    MsgReadIndex       = 15;    MsgReadIndexResp   = 16;    MsgPreVote         = 17;    MsgPreVoteResp     = 18;}
其核心为Message类,记录当前的任期(Term),信息的发送和接收方(From->To),以及需要传递的Context、Commit、Entry等字段。
type Message struct {    Type MessageType      To   uint64          From uint64          Term uint64          // logTerm is generally used for appending Raft logs to followers. For example,    // (type=MsgApp,index=100,logTerm=5) means leader appends entries starting at    // index=101, and the term of entry at index 100 is 5.    // (type=MsgAppResp,reject=true,index=100,logTerm=5) means follower rejects some    // entries from its leader as it already has an entry with term 5 at index 100.    LogTerm    uint64       Index      uint64       Entries    []Entry      Commit     uint64       Snapshot   Snapshot     Reject     bool         RejectHint uint64       Context    []byte   }(1) MsgHup


MsgHup用于开启选举。假如Follower和Candidate没有收到心跳,则开启一轮新的选举。实现于raft.go的tickElection中:
// tickElection is run by followers and candidates after r.electionTimeout.func (r *raft) tickElection() {    r.electionElapsed++    // 自己可以被promote & election timeout 超时了,规定时间没有听到心跳发起选举    // 发送MsgHup    if r.promotable() && r.pastElectionTimeout() {        r.electionElapsed = 0        if err := r.Step(pb.Message{From: r.id, Type: pb.MsgHup}); err != nil {            r.logger.Debugf("error occurred during election: %v", err)        }    }}
在核心方法Step中,接收到MsgHup则开启选举。在这里如果preVote为true,则需要进行第一次选举,否则执行常规选举:
    campaignPreElection :第一次选举campaignElection : 常规的基于时间的选举campaginTransfer : 表示转移Leader
switch m.Type {case pb.MsgHup:    if r.preVote {        r.hup(campaignPreElection)    } else {        r.hup(campaignElection)    }...}
其中hup方法本质就是调用campagin,将raft实例转化为Candidate状态,并发送voteMsg给其他node:
func (r *raft) hup(t CampaignType) {    // 主要核对是否是合法的Follower      if r.state == StateLeader || !r.promotable(){        return    }    ...    }    // 核对完成,开始选举    r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)    r.campaign(t)}func (r *raft) campaign(t CampaignType) {    ...    var term uint64    var voteMsg pb.MessageType    // 更改状态    if t == campaignPreElection {        r.becomePreCandidate()        voteMsg = pb.MsgPreVote        // PreVote RPCs are sent for the next term before we've incremented r.Term.        term = r.Term + 1    } else {        r.becomeCandidate()        voteMsg = pb.MsgVote        term = r.Term    }    // 投票,超过半数返回quorum的VoteWon,产生Leader    if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {        // 假如只有一个node还赢得了选举,改变到正常选举状态        if t == campaignPreElection {            r.campaign(campaignElection)        } else {            r.becomeLeader()        }        return    }    // 获取所有id并排序    var ids []uint64    {        idMap := r.prs.Voters.IDs()        ids = make([]uint64, 0, len(idMap))        for id := range idMap {            ids = append(ids, id)        }        sort.Slice(ids, func(i, j int) bool { return ids < ids[j] })    }    for _, id := range ids {        // 跳过自己         if id == r.id {            continue        }        r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",            r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)        // 假如要转让Leader,记录上下文        var ctx []byte        if t == campaignTransfer {            ctx = []byte(t)        }        // 以voteMsg的形式发送给其他follower当前的信息          r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})    }}(2) MsgBeat & MsgHeartbeat & MsgHeartbeatResp

    MsgBeat 是一个内部类型,在tickHeartbeat(tickElection外的另一个时钟方法)中触发Leader向Follower发送心跳信号MsgHeartbeat
func (r *raft) tickHeartbeat() {    r.heartbeatElapsed++    r.electionElapsed++    // electionTimeout : default 10次    // 超过则需要重新计时    if r.electionElapsed >= r.electionTimeout {        r.electionElapsed = 0        if r.checkQuorum {            if err := r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum}); err != nil {                r.logger.Debugf("error occurred during checking sending heartbeat: %v", err)            }        }        // If current leader cannot transfer leadership in electionTimeout, it becomes leader again.        if r.state == StateLeader && r.leadTransferee != None {            r.abortLeaderTransfer()        }    }    // Only leader sends heartbeat    if r.state != StateLeader {        return    }        if r.heartbeatElapsed >= r.heartbeatTimeout {        r.heartbeatElapsed = 0        if err := r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}); err != nil {            r.logger.Debugf("error occurred during checking sending heartbeat: %v", err)        }    }}
Leader接收到MsgBeat后开始发送心跳:
switch m.Type {case pb.MsgBeat:    r.bcastHeartbeat()    return nil...}// bcastHeartbeat 发送RPC,包含之前的上下文func (r *raft) bcastHeartbeat() {    lastCtx := r.readOnly.lastPendingRequestCtx()    if len(lastCtx) == 0 {        r.bcastHeartbeatWithCtx(nil)    } else {        r.bcastHeartbeatWithCtx([]byte(lastCtx))    }}func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {    r.prs.Visit(func(id uint64, _ *tracker.Progress) {        if id == r.id {            return        }        r.sendHeartbeat(id, ctx)    })}
    MsgHeartbeat在sendHeartBeat中发送来自Leader的心跳。
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {    // 发送当前最小匹配的commit : min(to.matched, r.committed)    commit := min(r.prs.Progress[to].Match, r.raftLog.committed)    m := pb.Message{        To:      to,        Type:    pb.MsgHeartbeat,        Commit:  commit,        Context: ctx,    }    r.send(m)}
在核心函数Step中,当收到的msg任期大于当前任期时,在以下情况中需要变为Follower:
    msgApp : 日志信息msgHeartbeart :心跳信息msgSnap :快照信息
case m.Term > r.Term:    ...    if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {                r.becomeFollower(m.Term, m.From)            } else {                r.becomeFollower(m.Term, None)            }}
当 MsgHeartbeat 的任期高于Candidate的任期,Candidate变回Follower;若是Follower则重新计时。
stepCandidate:    case pb.MsgHeartbeat:        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term        r.handleHeartbeat(m)stepFollower:    case pb.MsgHeartbeat:        r.electionElapsed = 0        r.lead = m.From        r.handleHeartbeat(m)
    MsgHeartbeatResp是MsgHeartbeat的响应,在handleHeartbeat中由Follower(Candidate已经变成了Follower)记录当前commit,随后发送响应。
func (r *raft) handleHeartbeat(m pb.Message) {    r.raftLog.commitTo(m.Commit)    r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})}
Leader得知是哪个Follower回应了自己的心跳,假如Leader的commitId大于Follower的match index,则发送sendAppend补充日志。
StepLeader:case pb.MsgHeartbeatResp:        pr.RecentActive = true        pr.ProbeSent = false        // 假如Follower尚未接受的MsgApp已经满了,空出一个slot        if pr.State == tracker.StateReplicate && pr.Inflights.Full() {            pr.Inflights.FreeFirstOne()        }        // 只有当 leader last committed index 大于 follower Match index, 让Follower追加日志        if pr.Match < r.raftLog.lastIndex() {            r.sendAppend(m.From)        }        if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {            return nil        }        // 处理只读相关        if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {            return nil        }        rss := r.readOnly.advance(m)        for _, rs := range rss {            if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None {                r.send(resp)            }        }(3) MsgProp


MsgProp提出向log entry追加日志的提议,这些提议将被重定向给Leader,视作Leader的本地消息,因此在send中我们不能将发送者的Term追加到待发送的提议中。MsgReadIndex也是同理。
send:    ...    if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {        m.Term = r.Term    }
Follower接收到MsgProp后,发送提议给Leader。假如Candidate收到MsgProp,说明Follower发给Leader的信息被发给了自己,当前没有Leader,则放弃该提议。
stepFollower:case pb.MsgProp:    ...    m.To = r.lead    r.send(m)stepCandidate:case pb.MsgProp:    r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)    return ErrProposalDropped
Leader处理MsgProp时,首先调用appendEntry将条目追加到自己的Log中,随后调用bcastAppend将这些条目发送给其他节点。
stepLeader:case pb.MsgProp:    ...    if !r.appendEntry(m.Entries...) {        return ErrProposalDropped    }    r.bcastAppend()    return nil(4) MsgApp & MsgAppResp


MsgApp包含要复制的日志entry和当前的Term,在maybeSendAppend中发送该消息。假如无法正确获取Term和Entry,可能当前日志条目已经被snapshot了,因此发送snapshot。否则就发送追加这段entry的消息。
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {    ...    term, errt := r.raftLog.term(pr.Next - 1)    ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)    if len(ents) == 0 && !sendIfEmpty {        return false    }    if errt != nil || erre != nil {        // send snapshot if we failed to get term or entries        ...    } else {        m.Type = pb.MsgApp        m.Index = pr.Next - 1        m.LogTerm = term        m.Entries = ents        m.Commit = r.raftLog.committed        if n := len(m.Entries); n != 0 {            switch pr.State {            // optimistically increase the next when in StateReplicate            case tracker.StateReplicate:                last := m.Entries[n-1].Index                pr.OptimisticUpdate(last)                pr.Inflights.Add(last)            // last index 未知,因此无法更新            case tracker.StateProbe:                pr.ProbeSent = true            default:                r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)            }        }    }    r.send(m)    return true}
Follower和Candidate收到MsgApp的处理后,需要追加日志。在此有可能发生日志冲突(Follower和Leader日志不一致),需要根据hintIndex查找当前冲突的index。
func (r *raft) handleAppendEntries(m pb.Message) {    // 当前committed已经超过了msg的日志,直接响应    if m.Index < r.raftLog.committed {        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})        return    }    // Append后响应    if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})    } else {        r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",            r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)        // 查找可能发生冲突的索引        // 也可以使用lastIndex-1,使用index和lastIndex的最小值更快        hintIndex := min(m.Index, r.raftLog.lastIndex())        hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm)        hintTerm, err := r.raftLog.term(hintIndex)        if err != nil {            panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err))        }        r.send(pb.Message{            To:         m.From,            Type:       pb.MsgAppResp,            Index:      m.Index,            Reject:     true,            RejectHint: hintIndex,            LogTerm:    hintTerm,        })    }}
懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Unity开发者联盟 ( 粤ICP备20003399号 )

GMT+8, 2025-5-17 09:38 , Processed in 0.134965 second(s), 25 queries .

Powered by Discuz! X3.5 Licensed

© 2001-2025 Discuz! Team.

快速回复 返回顶部 返回列表