zt3ff3n 发表于 2022-6-12 19:04

Go 分布式缓存简单实现

一个性能优异的系统架构,缓存是必不可少的。但在缓存的使用,可能会遇到一些问题,比如,内存不够、并发写入冲突、单机性能差等问题。为了解决这些问题,我们引入了分布式缓存。

本次分享主要通过学习极客兔兔 的文章,对关键节点进行了实现和解读。
缓存淘汰策略 LRU


[*]
首先,定义缓存数据结构,使用双向链表和字典进行数据存储和查找。
// Cache 使用最近最少使用算法,并发访问不安全type Cache struct {    maxBytesint64                         // 允许使用的最大内存    nbytes    int64                         // 当前已经使用的内存    ll      *list.List                  // 双向链表    cache   map*list.Element      // 键是字符串、值是双向链表中对应节点的指针    OnEvicted func(key string, value Value) // 可选参数,某条记录被移除时的回调函数,在清除条目时执行}
[*]
实现基本操作,包括增、删、查(改的复杂度较高,涉及到改的操作时,可以先删除,再新增)
// Get 从字典中找到对应的双向链表的节点,并将节点移动到队尾func (c *Cache) Get(key string) (value Value, ok bool) {    if ele, ok := c.cache; ok { // 如果键对应的节点存在      c.ll.MoveToFront(ele) // 将对应节点移动到队尾(双向链表作为队列,队首队尾是相对的,这里约定 front 为队尾),并返回查找到的值      kv := ele.Value.(*entry)      return kv.value, true    }    return}// RemoveOldest 删除,实际上是缓存淘汰,即移除最近最少访问的节点(队首)func (c *Cache) RemoveOldest() {    ele := c.ll.Back() // 取队首节点,从链表中删除    if ele != nil {      c.ll.Remove(ele)      kv := ele.Value.(*entry)      delete(c.cache, kv.key)                         // 从字典中删除该节点的映射关系      c.nbytes -= int64(len(kv.key) + kv.value.Len()) // 更新当前使用内存长度      if c.OnEvicted != nil {                         // 当回调函数不为空时进行调用            c.OnEvicted(kv.key, kv.value)      }    }}// Add 新增/修改func (c *Cache) Add(key string, value Value) {    if ele, ok := c.cache; ok { // 如果对应键存在,更新对应节点的值,并将该节点移到队尾      c.ll.MoveToFront(ele)      kv := ele.Value.(*entry)      c.nbytes += int64(value.Len() - kv.value.Len()) // 更新内存      kv.value = value    } else { // 如果对应键不存在则是新增场景,      ele := c.ll.PushFront(&entry{key: key, value: value}) // 队尾添加新节点,并在字典中添加 key 和节点的映射关系      c.cache = ele      c.nbytes += int64(len(key) + value.Len()) // 更新内存    }    for c.maxBytes != 0 && c.maxBytes < c.nbytes { // 如果超过设定的最大值,移除最少访问的节点      c.RemoveOldest()    }}
[*]
提供 New 方法,在初始化时指定存储最大值和回调函数。
func New(maxBytes int64, onEvicted func(string, Value)) *Cache {    return &Cache{      maxBytes:maxBytes,      ll:      list.New(),      cache:   make(map*list.Element),      OnEvicted: onEvicted,    }}
一致性哈希


一致性哈希的实现是基于哈希环实现的,主要为了解决数据倾斜和节点增删改时缓存雪崩的问题。

[*]
首先,定义数据结构。这里除了真实节点记录在哈希环上外,还引入了虚拟节点,形成虚拟节点到真实节点的映射,虚拟节点的个数和虚拟节点倍数相关。
type Hash func(data []byte) uint32type Map struct {    hash   Hash         // 哈希函数    replicas int            // 虚拟节点倍数    keys   []int          // 哈希环    hashMapmapstring // 虚拟节点与真实节点的映射表, 键是虚拟节点的哈希值,值是真实节点的名称}
[*]
实现基本操作,新增和查询。
func (m *Map) Add(keys ...string) {    for _, key := range keys {      for i := 0; i < m.replicas; i++ {            hash := int(m.hash([]byte(strconv.Itoa(i) + key))) // 通过添加编号的方式区分不同虚拟节点            m.keys = append(m.keys, hash)            m.hashMap = key // 增加虚拟节点和真实节点的映射关系      }    }    sort.Ints(m.keys)}func (m *Map) Get(key string) string {    if len(m.keys) == 0 {      return ""    }    hash := int(m.hash([]byte(key)))                   // 计算 key 的哈希值    idx := sort.Search(len(m.keys), func(i int) bool { // 顺时针找到第一个匹配的虚拟节点的下标 idx,从 m.keys 中获取到对应的哈希值      return m.keys >= hash    })    return m.hashMap] // 因为 m.keys 是一个环状结构,所以用取余数的方式来处理这种情况}
[*]
提供 New 方法,在初始化时指定虚拟节点倍数和哈希函数。
func New(replicas int, fn Hash) *Map {    m := &Map{      replicas: replicas,      hash:   fn,      hashMap:make(mapstring),    }    if m.hash == nil {      m.hash = crc32.ChecksumIEEE // 默认为 crc32.ChecksumIEEE 算法    }    return m}
使用锁机制防止缓存击穿


缓存击穿指大量请求同时访问缓存中不存在数据,因为缓存中不存在这个数据,请求就会击穿到 DB,造成瞬时 DB 压力骤增。解决方案也很简单,针对相同的 key,在访问时,只发起一次请求。

[*]
首先,定义数据结构。这里的 sync.WaitGroup 是为了解决不同协程之间的消息传递, sync.Mutex 则是为了解决并发读写而加上的锁。
// call 代表正在进行中,或已经结束的请求type call struct {    wgsync.WaitGroup    val interface{}    err error}// Group 管理不同 key 的请求(call)type Group struct {    mu sync.Mutex    mmap*call}
[*]
控制实现。
// Do 针对相同的 key,无论 Do 被调用多少次,函数 fn 都只会被调用一次,等待 fn 调用结束了,返回返回值或错误func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {    g.mu.Lock()    if g.m == nil {      g.m = make(map*call)    }    if c, ok := g.m; ok {      g.mu.Unlock()      c.wg.Wait()         // 如果请求正在进行中,则等待      return c.val, c.err // 请求结束,返回结果    }    c := new(call)    c.wg.Add(1)// 发起请求前加锁    g.m = c // 添加到 g.m,表明 key 已经有对应的请求在处理    g.mu.Unlock()    c.val, c.err = fn() // 调用 fn,发起请求    c.wg.Done()         // 请求结束    g.mu.Lock()    delete(g.m, key) // 更新 g.m    g.mu.Unlock()    return c.val, c.err // 返回结果}
节点之间通过 Protobuf 通信


Protobuf 的安装和使用教程可以参考官网,推荐使用 homebrew 进行安装。

[*]
创建并编辑 .proto 文件,进行数据结构的定义。
syntax = "proto3";package geecachepb;option go_package = "./";message Request {string group = 1;string key = 2;}message Response {bytes value = 1;}service GroupCache {rpc Get(Request) returns (Response);}
[*]
使用 protoc 生成 Go 代码。
protoc --go_out=. *.proto
[*]
在项目中使用生成的 Go 代码。
import (    pb "path/geecachepb")func (h *httpGetter) Get(in *pb.Request, out *pb.Response) error {    u := fmt.Sprintf("%v%v/%v", h.baseURL, url.QueryEscape(in.GetGroup()), url.QueryEscape(in.GetKey()))    res, err := http.Get(u)    if err != nil {      return err    }    defer res.Body.Close()    if res.StatusCode != http.StatusOK {      return fmt.Errorf("server returned: %v", res.Status)    }    bytes, err := ioutil.ReadAll(res.Body)    if err != nil {      return fmt.Errorf("reading response body: %v", err)    }    if err = proto.Unmarshal(bytes, out); err != nil { // 使用 proto.Unmarshal() 解码 HTTP 响应      return fmt.Errorf("decoding response body: %v", err)    }    return nil}
源代码这里
页: [1]
查看完整版本: Go 分布式缓存简单实现