找回密码
 立即注册
查看: 251|回复: 0

Go 分布式缓存简单实现

[复制链接]
发表于 2022-6-12 19:04 | 显示全部楼层 |阅读模式
一个性能优异的系统架构,缓存是必不可少的。但在缓存的使用,可能会遇到一些问题,比如,内存不够、并发写入冲突、单机性能差等问题。为了解决这些问题,我们引入了分布式缓存。

本次分享主要通过学习极客兔兔 的文章,对关键节点进行了实现和解读。
缓存淘汰策略 LRU



  • 首先,定义缓存数据结构,使用双向链表和字典进行数据存储和查找。
    // Cache 使用最近最少使用算法,并发访问不安全type Cache struct {    maxBytes  int64                         // 允许使用的最大内存    nbytes    int64                         // 当前已经使用的内存    ll        *list.List                    // 双向链表    cache     map[string]*list.Element      // 键是字符串、值是双向链表中对应节点的指针    OnEvicted func(key string, value Value) // 可选参数,某条记录被移除时的回调函数,在清除条目时执行}

  • 实现基本操作,包括增、删、查(改的复杂度较高,涉及到改的操作时,可以先删除,再新增)
    // Get 从字典中找到对应的双向链表的节点,并将节点移动到队尾func (c *Cache) Get(key string) (value Value, ok bool) {    if ele, ok := c.cache[key]; ok { // 如果键对应的节点存在        c.ll.MoveToFront(ele) // 将对应节点移动到队尾(双向链表作为队列,队首队尾是相对的,这里约定 front 为队尾),并返回查找到的值        kv := ele.Value.(*entry)        return kv.value, true    }    return}// RemoveOldest 删除,实际上是缓存淘汰,即移除最近最少访问的节点(队首)func (c *Cache) RemoveOldest() {    ele := c.ll.Back() // 取队首节点,从链表中删除    if ele != nil {        c.ll.Remove(ele)        kv := ele.Value.(*entry)        delete(c.cache, kv.key)                         // 从字典中删除该节点的映射关系        c.nbytes -= int64(len(kv.key) + kv.value.Len()) // 更新当前使用内存长度        if c.OnEvicted != nil {                         // 当回调函数不为空时进行调用            c.OnEvicted(kv.key, kv.value)        }    }}// Add 新增/修改func (c *Cache) Add(key string, value Value) {    if ele, ok := c.cache[key]; ok { // 如果对应键存在,更新对应节点的值,并将该节点移到队尾        c.ll.MoveToFront(ele)        kv := ele.Value.(*entry)        c.nbytes += int64(value.Len() - kv.value.Len()) // 更新内存        kv.value = value    } else { // 如果对应键不存在则是新增场景,        ele := c.ll.PushFront(&entry{key: key, value: value}) // 队尾添加新节点,并在字典中添加 key 和节点的映射关系        c.cache[key] = ele        c.nbytes += int64(len(key) + value.Len()) // 更新内存    }    for c.maxBytes != 0 && c.maxBytes < c.nbytes { // 如果超过设定的最大值,移除最少访问的节点        c.RemoveOldest()    }}

  • 提供 New 方法,在初始化时指定存储最大值和回调函数。
    func New(maxBytes int64, onEvicted func(string, Value)) *Cache {    return &Cache{        maxBytes:  maxBytes,        ll:        list.New(),        cache:     make(map[string]*list.Element),        OnEvicted: onEvicted,    }}
一致性哈希


一致性哈希的实现是基于哈希环实现的,主要为了解决数据倾斜和节点增删改时缓存雪崩的问题。


  • 首先,定义数据结构。这里除了真实节点记录在哈希环上外,还引入了虚拟节点,形成虚拟节点到真实节点的映射,虚拟节点的个数和虚拟节点倍数相关。
    type Hash func(data []byte) uint32type Map struct {    hash     Hash           // 哈希函数    replicas int            // 虚拟节点倍数    keys     []int          // 哈希环    hashMap  map[int]string // 虚拟节点与真实节点的映射表, 键是虚拟节点的哈希值,值是真实节点的名称}

  • 实现基本操作,新增和查询。
    func (m *Map) Add(keys ...string) {    for _, key := range keys {        for i := 0; i < m.replicas; i++ {            hash := int(m.hash([]byte(strconv.Itoa(i) + key))) // 通过添加编号的方式区分不同虚拟节点            m.keys = append(m.keys, hash)            m.hashMap[hash] = key // 增加虚拟节点和真实节点的映射关系        }    }    sort.Ints(m.keys)}func (m *Map) Get(key string) string {    if len(m.keys) == 0 {        return ""    }    hash := int(m.hash([]byte(key)))                   // 计算 key 的哈希值    idx := sort.Search(len(m.keys), func(i int) bool { // 顺时针找到第一个匹配的虚拟节点的下标 idx,从 m.keys 中获取到对应的哈希值        return m.keys >= hash    })    return m.hashMap[m.keys[idx%len(m.keys)]] // 因为 m.keys 是一个环状结构,所以用取余数的方式来处理这种情况}

  • 提供 New 方法,在初始化时指定虚拟节点倍数和哈希函数。
    func New(replicas int, fn Hash) *Map {    m := &Map{        replicas: replicas,        hash:     fn,        hashMap:  make(map[int]string),    }    if m.hash == nil {        m.hash = crc32.ChecksumIEEE // 默认为 crc32.ChecksumIEEE 算法    }    return m}
使用锁机制防止缓存击穿


缓存击穿指大量请求同时访问缓存中不存在数据,因为缓存中不存在这个数据,请求就会击穿到 DB,造成瞬时 DB 压力骤增。解决方案也很简单,针对相同的 key,在访问时,只发起一次请求。


  • 首先,定义数据结构。这里的 sync.WaitGroup 是为了解决不同协程之间的消息传递, sync.Mutex 则是为了解决并发读写而加上的锁。
    // call 代表正在进行中,或已经结束的请求type call struct {    wg  sync.WaitGroup    val interface{}    err error}// Group 管理不同 key 的请求(call)type Group struct {    mu sync.Mutex    m  map[string]*call}

  • 控制实现。
    // Do 针对相同的 key,无论 Do 被调用多少次,函数 fn 都只会被调用一次,等待 fn 调用结束了,返回返回值或错误func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {    g.mu.Lock()    if g.m == nil {        g.m = make(map[string]*call)    }    if c, ok := g.m[key]; ok {        g.mu.Unlock()        c.wg.Wait()         // 如果请求正在进行中,则等待        return c.val, c.err // 请求结束,返回结果    }    c := new(call)    c.wg.Add(1)  // 发起请求前加锁    g.m[key] = c // 添加到 g.m,表明 key 已经有对应的请求在处理    g.mu.Unlock()    c.val, c.err = fn() // 调用 fn,发起请求    c.wg.Done()         // 请求结束    g.mu.Lock()    delete(g.m, key) // 更新 g.m    g.mu.Unlock()    return c.val, c.err // 返回结果}
节点之间通过 Protobuf 通信


Protobuf 的安装和使用教程可以参考官网,推荐使用 homebrew 进行安装。


  • 创建并编辑 .proto 文件,进行数据结构的定义。
    syntax = "proto3";package geecachepb;option go_package = "./";message Request {  string group = 1;  string key = 2;}message Response {  bytes value = 1;}service GroupCache {  rpc Get(Request) returns (Response);}

  • 使用 protoc 生成 Go 代码。
    protoc --go_out=. *.proto

  • 在项目中使用生成的 Go 代码。
    import (    pb "path/geecachepb")func (h *httpGetter) Get(in *pb.Request, out *pb.Response) error {    u := fmt.Sprintf("%v%v/%v", h.baseURL, url.QueryEscape(in.GetGroup()), url.QueryEscape(in.GetKey()))    res, err := http.Get(u)    if err != nil {        return err    }    defer res.Body.Close()    if res.StatusCode != http.StatusOK {        return fmt.Errorf("server returned: %v", res.Status)    }    bytes, err := ioutil.ReadAll(res.Body)    if err != nil {        return fmt.Errorf("reading response body: %v", err)    }    if err = proto.Unmarshal(bytes, out); err != nil { // 使用 proto.Unmarshal() 解码 HTTP 响应        return fmt.Errorf("decoding response body: %v", err)    }    return nil}
源代码这里
懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Unity开发者联盟 ( 粤ICP备20003399号 )

GMT+8, 2024-11-26 16:49 , Processed in 0.087859 second(s), 25 queries .

Powered by Discuz! X3.5 Licensed

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表