找回密码
 立即注册
查看: 237|回复: 0

200 行代码实现基于 Paxos 的 KV 存储

[复制链接]
发表于 2022-5-20 16:29 | 显示全部楼层 |阅读模式
[图片上传失败...(image-4b0a80-1653035715823)]
前言


写完【paxos 的直观解释】之后,网友都说疗效甚好,但是也会对这篇教程中一些环节提出疑问(有疑问说明真的看懂了 ),例如怎么把只能确定一个值的 paxos 应用到实际场景中。

既然 Talk is cheap,那么就 Show me the code,这次我们把教程中描述的内容直接用代码实现出来,希望能覆盖到教程中的涉及的每个细节。帮助大家理解 paxos 的运行机制。

这是一个基于 paxos,200 行代码的 kv 存储系统的简单实现,作为paxos 的直观解释这篇教程中的代码示例部分。Paxos 的原理本文不再介绍了,本文提到的数据结构使用【protobuf】定义,网络部分使用【grpc】定义。另外 200 行 go 代码实现 paxos 存储。

文中的代码可能做了简化, 完整代码实现在【paxoskv】这个项目中(naive 分支)。
运行和使用


跑一下:
git clone https://github.com/openacid/paxoskv.gitcd paxoskvgo test -v ./...
这个项目中除了 paxos 实现,用 3 个 test case 描述了 3 个 paxos 运行的例子,
    【TestCase1SingleProposer】:无冲突运行。【TestCase2DoubleProposer】:有冲突运行。【Example_setAndGetByKeyVer】作为 key-val 使用。

测试代码描述了几个 paxos 运行例子的行为,运行测试可以确认 paxos 的实现符合预期。

本文中 protobuf 的数据结构定义如下:
service PaxosKV {    rpc Prepare (Proposer) returns (Acceptor) {}    rpc Accept (Proposer) returns (Acceptor) {}}message BallotNum {    int64 N          = 1;    int64 ProposerId = 2;}message Value {    int64 Vi64 = 1;}message PaxosInstanceId {    string Key = 1;    int64  Ver = 2;}message Acceptor {    BallotNum LastBal = 1;    Value     Val     = 2;    BallotNum VBal    = 3;}message Proposer {    PaxosInstanceId Id  = 1;    BallotNum       Bal = 2;    Value           Val = 3;}

以及主要的函数实现:
// struct KVServerStorage : map[string]Versionsfunc Accept(c context.Context, r *Proposer) (*Acceptor, error)func Prepare(c context.Context, r *Proposer) (*Acceptor, error)func getLockedVersion(id *PaxosInstanceId) *Version// struct Proposerfunc Phase1(acceptorIds []int64, quorum int) (*Value, *BallotNum, error)func Phase2(acceptorIds []int64, quorum int) (*BallotNum, error)func RunPaxos(acceptorIds []int64, val *Value) *Valuefunc rpcToAll(acceptorIds []int64, action string) []*Acceptorfunc ServeAcceptors(acceptorIds []int64) []*grpc.Server从头实现 Paxoskv


Paxos 相关的数据结构

在这个例子中我们的数据结构和服务框架使用【protobuf】和【grpc】实现,首先是最底层的 paxos 数据结构:Proposer 和 Acceptor在【slide-27】中我们介绍了 1 个 Acceptor 所需的字段:
在存储端(Acceptor)也有几个概念:
    last_rnd 是 Acceptor 记住的最后一次进行写前读取的 Proposer(客户端)是谁,以此来决定谁可以在后面真正把一个值写到存储中。v 是最后被写入的值。vrnd 跟 v 是一对, 它记录了在哪个 Round 中 v 被写入了。
[图片上传失败...(image-5118ad-1653036090548)]

原文中这些名词是参考了【paxos made simple】中的名称,但在【Leslie Lamport】后面的几篇 paper 中都换了名称,为了后续方便,在【paxoskv】的代码实现中也做了相应的替换:
rnd      ==> Bal   // 每一轮paxos的编号, BallotNumvrnd     ==> VBal  // 在哪个Ballot中v被Acceptor 接受(voted)last_rnd ==> LastBal
Proposer 的字段也很简单,它需要记录:
    当前的 ballot number:Bal,以及它选择在 Phase2 运行的值:Val(【slide-29】)。

于是在这个项目中用 protobuf 定义这两个角色的数据结构,如代码【paxoskv.proto】中的声明,如下:
message Acceptor {  BallotNum LastBal = 1;  Value     Val = 2;  BallotNum VBal = 3;}message Proposer {  PaxosInstanceId Id = 1;  BallotNum Bal = 2;  Value     Val = 3;}
其中 Proposer 还需要一个 PaxosInstanceId,来标识当前的 paxos 实例为哪个 key 的哪个 version 在做决定,【paxos made simple】中只描述了一个 paxos 实例的算法(对应一个 key 的一次修改),要实现多次修改,就需要增加这个字段来区分不同的 paxos 实例:
message PaxosInstanceId {  string Key = 1;  int64  Ver = 2;}
【paxoskv.proto】还定义了一个 BallotNum,因为要保证全系统内的 BallotNum 都有序且不重复,一般的做法就是用一个本地单调递增的整数,和一个全局唯一的 id 组合起来实现:
message BallotNum {    int64 N = 1;    int64 ProposerId = 2;}
定义 RPC 消息结构

RPC 消息定义了 Proposer 和 Acceptor 之间的通讯。

在一个 paxos 系统中,至少要有 4 个消息:
    Phase 1 的 Prepare-request,Prepare-replyPhase 2 的 Accept-request,Accept-reply

如【slide-28】所描述的(原文中使用 rnd,这里使用 Bal,都是同一个概念):
Phase- 1(Prepare):
request:    Bal: intreply:    LastBal: int    Val:     string    VBal:    int
Phase- 2(Accept):
request:    Bal: int    Val:   stringreply:    LastBal: int
在 Prepare-request 或 Accept-request 中,发送的是一部分或全部的 Proposer 的字段,因此我们在代码中:
    直接把 Proposer 的结构体作为 request 的结构体同样把 Acceptor 的结构体作为 reply 的结构体

在使用的时候只使用其中几个字段,对应我们的 RPC 服务【PaxosKV】定义如下:
service PaxosKV {    rpc Prepare (Proposer) returns (Acceptor) {}    rpc Accept (Proposer) returns (Accetor) {}}使用 Protobuf 和 Grpc 生成服务框架



protobuf 可以将【paxoskv.proto】直接生成 go 代码(代码库中已经包含了生成好的代码:【paxoskv.pb.go】,只有修改【paxoskv.proto】之后才需要重新生成)
    首先安装 protobuf 的编译器 protoc,可以根据【install-protoc】中的步骤安装, 一般简单的一行命令就可以了:安装好之后通过 protoc--version 确认版本,至少应该是 3.x: libprotoc 3.13.0
      Linux:apt install-y protobuf-compilerMac:brew install protobuf
    安装 protoc 的 go 语言生成插件 protoc-gen-go:go get -u github.com/golang/protobuf/protoc-gen-go重新编译 protokv.proto 文件:直接 make gen 或:
  protoc       --proto_path=proto       --go_out=plugins=grpc:paxoskv       paxoskv.proto
生成后的【paxoskv.pb.go】代码中可以看到,其中主要的数据结构例如 Acceptor 的定义:
type Acceptor struct {  LastBal *BallotNum ...  Val     *Value ...  VBal    *BallotNum ...        ...}
以及 KV 服务的 client 端和 server 端的代码,client 端是实现好的,server 端只有一个 interface,后面我们需要来完成它的实现:
type paxosKVClient struct {  cc *grpc.ClientConn}type PaxosKVClient interface {  Prepare(    ctx context.Context,    in *Proposer,    opts ...grpc.CallOption  ) (*Acceptor, error)  Accept(    ctx context.Context,    in *Proposer,    opts ...grpc.CallOption  ) (*Acceptor, error)}type PaxosKVServer interface {  Prepare(context.Context,          *Proposer) (*Acceptor, error)  Accept(context.Context,         *Proposer) (*Acceptor, error)}实现存储的服务器端


【impl.go】是所有实现部分,我们定义一个 KVServer 结构体,用来实现 grpc 服务的 interface PaxosKVServer;其中使用一个内存里的 map 结构模拟数据的存储:
type Version struct {  mu       sync.Mutex  acceptor Acceptor}type Versions map[int64]*Versiontype KVServer struct {  mu      sync.Mutex  Storage map[string]Versions}
其中 Version 对应一个 key 的一次变化,也就是对应一个 paxos 实例,Versions 对应一个 key 的一系列变化,Storage 就是所有 key 的所有变化。
实现 Acceptor 的 grpc 服务 handler


Acceptor,是这个系统里的 server 端,监听一个端口,等待 Proposer 发来的请求并处理,然后给出应答。

根据 paxos 的定义,Acceptor 的逻辑很简单:在【slide-28】中描述:

[图片上传失败...(image-2c46d4-1653036090548)]

根据教程里的描述,为 KVServer 定义 handle Prepare-request 的代码:
func (s *KVServer) Prepare(    c context.Context,    r *Proposer) (*Acceptor, error) {  v := s.getLockedVersion(r.Id)  defer v.mu.Unlock()  reply := v.acceptor  if r.Bal.GE(v.acceptor.LastBal) {    v.acceptor.LastBal = r.Bal  }  return &reply, nil}
这段代码分 3 步:
    取得 paxos 实例,生成应答:Acceptor 总是返回 LastBal,Val,VBal  这 3 个字段,所以直接把 Acceptor 赋值给 reply。最后更新 Acceptor 的状态:然后按照 paxos 算法描述,如果请求中的 ballot number 更大,则记录下来,表示不在接受更小 ballot number 的 Proposer。

其中 getLockedVersion() 从 KVServer.Storage 中根据 request 发来的PaxosInstanceId 中的字段 key 和 ver 获取一个指定 Acceptor 的实例:
func (s *KVServer) getLockedVersion(    id *PaxosInstanceId) *Version {  s.mu.Lock()  defer s.mu.Unlock()  key := id.Key  ver := id.Ver  rec, found := s.Storage[key]  if !found {    rec = Versions{}    s.Storage[key] = rec  }  v, found := rec[ver]  if !found {    // initialize an empty paxos instance    rec[ver] = &Version{      acceptor: Acceptor{        LastBal: &BallotNum{},        VBal:    &BallotNum{},      },    }    v = rec[ver]  }  v.mu.Lock()  return v}
handle Accept-request 的处理类似,在【slide-31】中描述:

[图片上传失败...(image-8eee11-1653036090548)]

Accept() 要记录 3 个值,
    LastBal:Acceptor 看到的最大的 ballot number;Val:Proposer 选择的值,以及 VBal:Proposer 的 ballot number:
func (s *KVServer) Accept(    c context.Context,    r *Proposer) (*Acceptor, error) {  v := s.getLockedVersion(r.Id)  defer v.mu.Unlock()  reply := Acceptor{    LastBal: &*v.acceptor.LastBal,  }  if r.Bal.GE(v.acceptor.LastBal) {    v.acceptor.LastBal = r.Bal    v.acceptor.Val = r.Val    v.acceptor.VBal = r.Bal  }  return &reply, nil}
Acceptor 的逻辑到此完整了,再看 Proposer:

实现 Proposer 逻辑

Proposer 的运行分 2 个阶段,Phase1 和 Phase2,与 Prepare 和 Accept 对应。

Phase1

在【impl.go】的实现中,Proposer.Phase1() 函数负责 Phase1 的逻辑:
func (p *Proposer) Phase1(    acceptorIds []int64,    quorum int) (*Value, *BallotNum, error) {  replies := p.rpcToAll(acceptorIds, "Prepare")  ok := 0  higherBal := *p.Bal  maxVoted := &Acceptor{VBal: &BallotNum{}}  for _, r := range replies {    if !p.Bal.GE(r.LastBal) {      higherBal = *r.LastBal      continue    }    if r.VBal.GE(maxVoted.VBal) {      maxVoted = r    }    ok += 1    if ok == quorum {      return maxVoted.Val, nil, nil    }  }  return nil, &higherBal, NotEnoughQuorum}
这段代码首先通过 rpcToAll() 向所有 Acceptor 发送 Prepare-request 请求, 然后找出所有的成功的 reply:
    如果发现一个更大的 ballot number,表示一个 Prepare 失败:有更新的Proposer 存在;否则,它是一个成功的应答,再看它有没有返回一个已经被 Acceptor 接受(voted)的值。

最后,成功应答如果达到多数派(quorum),则认为 Phase1 完成,返回最后一个被 voted 的值,也就是 VBal 最大的那个。让上层调用者继续 Phase2;如果没有达到 quorum,这时可能是有多个 Proposer 并发运行而造成冲突,有更大的 ballot number,这时则把见到的最大 ballot number 返回,由上层调用者提升 ballot number 再重试。

client 与 server 端的连接

上面用到的 rpcToAll 在这个项目中的实现 client 端(Proposer)到 server 端(Acceptor)的通讯,它是一个十分 简洁美观 简陋的 grpc 客户端实现:
func (p *Proposer) rpcToAll(    acceptorIds []int64,    action string) []*Acceptor {  replies := []*Acceptor{}  for _, aid := range acceptorIds {    var err error    address := fmt.Sprintf("127.0.0.1:%d",        AcceptorBasePort+int64(aid))    conn, err := grpc.Dial(        address, grpc.WithInsecure())    if err != nil {      log.Fatalf("did not connect: %v", err)    }    defer conn.Close()    c := NewPaxosKVClient(conn)    ctx, cancel := context.WithTimeout(        context.Background(), time.Second)    defer cancel()    var reply *Acceptor    if action == "Prepare" {      reply, err = c.Prepare(ctx, p)    } else if action == "Accept" {      reply, err = c.Accept(ctx, p)    }    if err != nil {      continue    }    replies = append(replies, reply)  }  return replies}Phase2


Proposer 运行的 Phase2 在【slide-30】中描述,比 Phase1 更简单:
在第 2 阶段 phase-2,Proposer X 将它选定的值写入到 Acceptor 中,这个值可能是它自己要写入的值,或者是它从某个 Acceptor 上读到的 v(修复)。
func (p *Proposer) Phase2(    acceptorIds []int64,    quorum int) (*BallotNum, error) {  replies := p.rpcToAll(acceptorIds, "Accept")  ok := 0  higherBal := *p.Bal  for _, r := range replies {    if !p.Bal.GE(r.LastBal) {      higherBal = *r.LastBal      continue    }    ok += 1    if ok == quorum {      return nil, nil    }  }  return &higherBal, NotEnoughQuorum}
我们看到,它只需要确认成 Phase2 的功应答数量达到 quorum 就可以了。另外同样它也有责任在 Phase2 失败时返回看到的更大的 ballot number,因为在 Phase1 和 Phase2 之间可能有其他 Proposer 使用更大的 ballot number 打断了当前 Proposer 的执行,就像【slide-33】的冲突解决的例子中描述的那样。
完整的 Paxos 逻辑


完整的 paxos 由 Proposer 负责,包括:如何选择一个值,使得一致性得以保证。如【slide-29】中描述的:
Proposer X 收到多数(quorum)个应答,就认为是可以继续运行的。如果没有联系到多于半数的 acceptor,整个系统就 hang 住了,这也是 paxos 声称的只能运行少于半数的节点失效。这时 Proposer 面临 2 种情况:所有应答中都没有任何非空的 v,这表示系统之前是干净的,没有任何值已经被其他 paxos 客户端完成了写入(因为一个多数派读一定会看到一个多数派写的结果),这时 Proposer X 继续将它要写的值在 phase-2 中真正写入到多于半数的 Acceptor 中。如果收到了某个应答包含被写入的 v 和 vrnd,这时,Proposer X 必须假设有其他客户端(Proposer)正在运行,虽然 X 不知道对方是否已经成功结束,但任何已经写入的值都不能被修改!所以 X 必须保持原有的值。于是 X 将看到的最大 vrnd 对应的 v 作为 X 的 phase-2 将要写入的值。这时实际上可以认为 X 执行了一次(不知是否已经中断的)其他客户端(Proposer)的修复。
[图片上传失败...(image-d8c890-1653036090548)]

基于 Acceptor 的服务端和 Proposer 2 个 Phase 的实现,最后把这些环节组合到一起组成一个完整的 paxos,在我们的代码【RunPaxos】这个函数中完成这些事情:
func (p *Proposer) RunPaxos(    acceptorIds []int64,    val *Value) *Value {  quorum := len(acceptorIds)/2 + 1  for {    p.Val = val    maxVotedVal, higherBal, err := p.Phase1(        acceptorIds, quorum)    if err != nil {      p.Bal.N = higherBal.N + 1      continue    }    if maxVotedVal != nil {      p.Val = maxVotedVal    }    // val == nil 是一个读操作,    // 没有读到voted值不需要Phase2    if p.Val == nil {      returnnil    }    higherBal, err = p.Phase2(        acceptorIds, quorum)    if err != nil {      p.Bal.N = higherBal.N + 1      continue    }    return p.Val  }}
这段代码完成了几件事:运行 Phase1,有 voted 的值就选它,没有就选自己要写的值 val,然后运行 Phase2。
就像 Phase1 Phase2 中描述的一样,任何一个阶段,如果没达到 quorum,就需要提升遇到的更大的 ballot number,重试去解决遇到的 ballot number 冲突。这个函数接受 2 个参数:
    所有 Acceptor 的列表(用一个整数的 id 表示一个 Acceptor),以及要提交的值。

其中,按照 paxos 的描述,这个值 val 不一定能提交:如果 paxos 在 Phase1 完成后看到了其他已经接受的值(voted value),那就要选择已接收的值,放弃 val。遇到这种情况,在我们的系统中,例如要写入  key=foo,ver=3 的值为 bar,如果没能选择 bar,就要选择下一个版本  key=foo,ver=4 再尝试写入。这样不断的重试循环, 写操作最终都能成功写入一个值(voted value)。
实现读操作


在我们这个 NB(naive and basic)的系统中,读和写一样都要通过一次 paxos 算法来完成。因为写入过程就是一次 paxos 执行,而 paxos 只保证在一个 quorum 中写入确定的值,不保证所有节点都有这个值。因此一次读操作如果要读到最后写入的值,至少要进行一次多数派读。

但多数派读还不够:它可能读到一个未完成的 paxos 写入,如【slide-11】中描述的脏读问题,读取到的最大 VBal 的值,可能不是确定的值(写入到多数派)。

例如下面的状态:
Val=foo    Val=bar    ?VBal=3     VBal=2     ?-------    -------    --A0         A1         A2
如果 Proposer 试图读,在 Phase1 联系到 A0 A1 这 2 个 Acceptor,那么 foo 和 bar 这 2 个值哪个是确定下来的,要取决于 A2 的状态。所以这时要再把最大VBal 的值跑完一次  Phase2,让它被确定下来,然后才能把结果返回给上层(否则另一个 Proposer 可能联系到 A1 和 A2,然后认为 Val=bar 是被确定的值)。

当然如果 Proposer 在读取流程的 Phase1 成功后没有看到任何已经 voted 的值(例如没有看到 foo 或 bar), 就不用跑 Phase2 了。

所以在这个版本的实现中,读操作也是一次【RunPaxos】函数的调用,除了它并不 propose 任何新的值,为了支持读操作,所以在上面的代码中 Phase2 之前加入一个判断,如果传入的 val 和已 voted 的值都为空,则直接返回:
if p.Val == nil {  return nil}
【Example_setAndGetByKeyVer】这个测试用例展示了如何使用 paxos 实现一个 kv 存储,实现读和写的代码大概这样:
prop := Proposer{  Id: &PaxosInstanceId{    Key: "foo",    Ver: 0,  },  Bal: &BallotNum{N: 0, ProposerId: 2},}// 写:v := prop.RunPaxos(acceptorIds, &Value{Vi64: 5})// 读:v := prop.RunPaxos(acceptorIds, nil)
到现在为止,本文中涉及到的功能都实现完了,完整实现在【impl.go】中。

接着我们用测试用例实现 1 下【paxos的直观解释】中列出的 2 个例子, 从代码看 poxos 的运行:
文中例子


第1个例子是 paxos 无冲突的运行【slide-32】:

[图片上传失败...(image-8e6da7-1653036090548)]

把它写成 test case,确认教程中每步操作之后的结果都如预期 【TestCase1SingleProposer】:
func TestCase1SingleProposer(t *testing.T) {  ta := require.New(t)  acceptorIds := []int64{0, 1, 2}  quorum := 2  // 启动3个Acceptor的服务  servers := ServeAcceptors(acceptorIds)  defer func() {    for _, s := range servers {      s.Stop()    }  }()  // 用要更新的key和version定义paxos 实例的id  paxosId := &PaxosInstanceId{    Key: "i",    Ver: 0,  }  var val int64 = 10  // 定义Proposer, 随便选个Proposer id 10.  var pidx int64 = 10  px := Proposer{    Id:  paxosId,    Bal: &BallotNum{N: 0, ProposerId: pidx},  }  // 用左边2个Acceptor运行Phase1,  // 成功, 没有看到其他的ballot number  latestVal, higherBal, err := px.Phase1(      []int64{0, 1}, quorum)  ta.Nil(err, "constitued a quorum")  ta.Nil(higherBal, "no other proposer is seen")  ta.Nil(latestVal, "no voted value")  // Phase1成功后, 因为没有看到其他voted的值,  // Proposer选择它自己的值进行后面的Phase2  px.Val = &Value{Vi64: val}  // Phase 2  higherBal, err = px.Phase2(      []int64{0, 1}, quorum)  ta.Nil(err, "constitued a quorum")  ta.Nil(higherBal, "no other proposer is seen")}
第 2 个例子对应 2 个 Proposer 遇到冲突并解决冲突的例子,略长不贴在文中了,代码可以在 【TestCase2DoubleProposer】看到。

640.jpg

工程


Paxos 的出色之处在于它将分布式一致性问题简化到最核心的部分,没有任何多余的设计。

工程实现上我们多数时候会用一个 paxos 的变体,它需要对 paxos 中的实例扩展为一系列多值的操作日志,支持完整的状态机,以及对运维提供支持成员变更,所以 raft 在工程上更受欢迎:

https://github.com/datafuselabs/openraft

创建 openraft 这个项目的目的是:
    优化和改良 raft 算法本身的问题:


      例如一个 term 内无法选出多个 leader,造成选举冲突过多的问题,例如不必要的 pre-vote 阶段的引入,例如 raft 作为一个一致性算法对外部时钟的依赖,
    • 例如强制的 leader/candidate 阶段的拆分使得换 leader 要经历一个无法服务的 candidate-state 的阶段。
      openraft 正在解决的这些问题,使之不仅仅是一个为了性能和安全用 rust 重写的项目。


    其次在用户接口上,提供一组语义明确的 async API。
参考链接


本文用到的代码在 paxoskv 项目的 naive 分支上:

【https://github.com/openacid/paxoskv/tree/naive】
    【paxos made simple】:http://lamport.azurewebsites.net/pubs/pubs.html#paxos-simple【Leslie Lamport】:http://www.lamport.org/【protobuf】:https://developers.google.com/protocol-buffers【install-protoc】:https://grpc.io/docs/protoc-installation/【grpc】:https://grpc.io/【paxos的直观解释】:https://blog.openacid.com/algo/paxos【issue】:https://github.com/openacid/paxoskv/issues/new/choose【paxoskv】:https://github.com/openacid/paxoskv/tree/naive【TestCase1SinglePropose】:https://github.com/openacid/paxoskv/blob/naive/paxoskv/paxos_slides_case_test.go#L11【TestCase2DoubleProposer】:https://github.com/openacid/paxoskv/blob/naive/paxoskv/paxos_slides_case_test.go#L57【Example_setAndGetByKeyVer】:https://github.com/openacid/paxoskv/blob/naive/paxoskv/example_set_get_test.go【Openraft】: https://github.com/datafuselabs/openraft

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Unity开发者联盟 ( 粤ICP备20003399号 )

GMT+8, 2024-11-27 05:23 , Processed in 0.095107 second(s), 26 queries .

Powered by Discuz! X3.5 Licensed

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表