> 文章列表 > 【go项目-geecache】动手写分布式缓存 day2 - 单机并发缓存

【go项目-geecache】动手写分布式缓存 day2 - 单机并发缓存

【go项目-geecache】动手写分布式缓存 day2 - 单机并发缓存

sync.Mutex 互斥锁

如果我们要是实现并发缓存,那么我们要引入sync.Mutex 互斥锁来保证多个协程不冲突,确保同一时间只有一个协程运行,我们在使用的时候使用Lock() 和unLock()来实现阻塞

实现并发读写

实现ByteView表示缓存值 1.go

package geecache
type ByteView struct {b []byte //缓存值,byte是为了通用性
}
func (v ByteView) Len() int {return len(v.b)
}
func (v ByteView) ByteSlice() []byte {return cloneBytes(v.b)
}
func (v ByteView) String() string { // 返回一个字符串的拷贝return string(v.b)
}func cloneBytes(b []byte) []byte { // 返回一个byte的拷贝c := make([]byte, len(b))copy(c, b)return c
}
  • ByteView,b表示实际的缓存值
  • Len()实现接口,表示所占的内存
  • ByteSlice() 返回一个拷贝,因为ByteView不能修改

封装lru.Cache ,添加并发属性 2.go

实现cache数据结构

package geecache
import ("geecache/lru""sync"
)
type cache struct {mu         sync.Mutexlru        *lru.CachecacheBytes int64
}

cache : 里面封装了lru的Cache,控制并发的mutex锁,和缓存大小

实现增加和get函数

func (c *cache) add(key string, value ByteView) {c.mu.Lock()defer c.mu.Unlock()if c.lru == nil {c.lru = lru.New(c.cacheBytes, nil)}c.lru.Add(key, value)
}
func (c *cache) get(key string) (value ByteView, ok bool) {c.mu.Lock()defer c.mu.Unlock()if c.lru == nil {return}if v, ok := c.lru.Get(key); ok {return v.(ByteView), ok}return
}

add和get函数逻辑类似,首先使用mutex锁保证不冲突,然后查看cache已经存在,如果不存在则初始化,然后添加/获得数据

defer 的 使用

这里使用了defer,defer的作用就是令函数最后执行,所以虽然 c.mu.Lock()
    defer c.mu.Unlock()写在一起,但是Unlock()是最后运行的,即保证协程不冲突,又提高代码可读性,不会忘记解锁

实现Group ,负责控制缓存值的存取 group.go

实现回调函数,在缓存不存在时获取数据

当我们在缓存中找不到数据时,此时我们需要从外界获取数据,由于数据来源的多种,我们不应该考虑这么多,为了拓展性和可读性,我们实现一个回调函数来通知用户我们需要数据,用户通过这个接口把数据传入

package geecache
import ("fmt""log""sync"
)
type Getter interface {Get(key string) ([]byte, error)
}
type GetterFunc func(key string) ([]byte, error)
func (f GetterFunc) Get(key string) ([]byte, error) {return f(key)
}

这里定义了Getter接口,GetterFunc函数类型,并且为这个类型实现了Getter接口Get方法

在这里为什么使用接口而不用直接函数实现呢?
  • 接口比函数更好的地方在于接口并不关心传入的数据类型,所以接口可以实现多态,更灵活也更节省代码,面对其他情况也能处理
  • 而函数需要形参,对传入参数有要求,面对复杂场景无法处理

实现Group数据结构

type Group struct {name      stringgetter    GettermainCache cache
}
var (mu     sync.RWMutexgroups = make(map[string]*Group)
)```
- name 表示缓存的名字
- getter 回调函数,用于不命中缓存时从用户获取数据
- mainCache 缓存,实现lru算法,add/get等操作
除此以外还有两个全局变量
- mu RW锁 允许多个同时读,禁止读写和写写同时操作
- groups # 实例化函数
```go
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {if getter == nil {panic("nil Getter")}mu.Lock()defer mu.Unlock()g := &Group{name:      name,getter:    getter,mainCache: cache{cacheBytes: cacheBytes},}groups[name] = greturn g
}

首先还是判断回调函数是否正常(是否为空),正常则开启mutex锁,保证协程正常运行,然后实例化

实现Get函数和GetGroup函数

func GetGroup(name string) *Group {  mu.RLock()  g := groups[name]  mu.RUnlock()  return g  
}
  • GetGroup函数通过缓存名字得到group
func (g *Group) Get(key string) (ByteView, error) {if key == "" {return ByteView{}, fmt.Errorf("key is required")}if v, ok := g.mainCache.get(key); ok {log.Println("[GeeCache] hit")return v, nil}return g.load(key)
}
func (g *Group) load(key string) (value ByteView, err error) {return g.getLocally(key)
}
func (g *Group) getLocally(key string) (ByteView, error) {bytes, err := g.getter.Get(key)if err != nil {return ByteView{}, err}value := ByteView{b: cloneBytes(bytes)}g.populateCache(key, value)return value, nil
}
func (g *Group) populateCache(key string, value ByteView) {g.mainCache.add(key, value)
}
  • 实现Get函数,首先判断key是否存在实现过滤,然后查看缓存,找的到就返回,否则使用load函数去进一步查找
  • load函数调用getLocally,在分布式场景会使用其他函数获取
  • getLocally 首先使用回调函数,如果成功得到数据则将源数据添加到缓存 mainCache 中(通过 populateCache 方法)
  • populateCache 将数据添加main_Cache缓存

收获总结:

  • 了解接口的使用场景,它和函数之间的差别和优略势
  • 测试文件要以_test结尾
  • 系统设计要严谨,要考虑后期的拓展性和维护 ,比如load函数考虑到了分布式场景
  • 数据结构之间的封装

实现代码和测试代码

1.go

package geecachetype ByteView struct {b []byte //缓存值,byte是为了通用性}func (v ByteView) Len() int {return len(v.b)}func (v ByteView) ByteSlice() []byte {return cloneBytes(v.b)}func (v ByteView) String() string { // 返回一个字符串的拷贝return string(v.b)}func cloneBytes(b []byte) []byte { // 返回一个byte的拷贝c := make([]byte, len(b))copy(c, b)return c}

2.go

package geecacheimport ("geecache/lru""sync")type cache struct {mu         sync.Mutexlru        *lru.CachecacheBytes int64}func (c *cache) add(key string, value ByteView) {c.mu.Lock()defer c.mu.Unlock()if c.lru == nil {c.lru = lru.New(c.cacheBytes, nil)}c.lru.Add(key, value)}func (c *cache) get(key string) (value ByteView, ok bool) {c.mu.Lock()defer c.mu.Unlock()if c.lru == nil {return}if v, ok := c.lru.Get(key); ok {return v.(ByteView), ok}return}

group.go

package geecacheimport ("fmt""log""sync")type Group struct {name      stringgetter    GettermainCache cache}type Getter interface {Get(key string) ([]byte, error)}type GetterFunc func(key string) ([]byte, error)func (f GetterFunc) Get(key string) ([]byte, error) {return f(key)}var (mu     sync.RWMutexgroups = make(map[string]*Group))func NewGroup(name string, cacheBytes int64, getter Getter) *Group {if getter == nil {panic("nil Getter")}mu.Lock()defer mu.Unlock()g := &Group{name:      name,getter:    getter,mainCache: cache{cacheBytes: cacheBytes},}groups[name] = greturn g}func GetGroup(name string) *Group {mu.RLock()g := groups[name]mu.RUnlock()return g}func (g *Group) Get(key string) (ByteView, error) {if key == "" {return ByteView{}, fmt.Errorf("key is required")}if v, ok := g.mainCache.get(key); ok {log.Println("[GeeCache] hit")return v, nil}return g.load(key)}func (g *Group) load(key string) (value ByteView, err error) {return g.getLocally(key)}func (g *Group) getLocally(key string) (ByteView, error) {bytes, err := g.getter.Get(key)if err != nil {return ByteView{}, err}value := ByteView{b: cloneBytes(bytes)}g.populateCache(key, value)return value, nil}func (g *Group) populateCache(key string, value ByteView) {g.mainCache.add(key, value)}

group_test.go

package geecacheimport ("fmt""log""reflect""testing")var db = map[string]string{"Tom":  "630","Jack": "589","Sam":  "567",}func TestGetter(t *testing.T) {var f Getter = GetterFunc(func(key string) ([]byte, error) {return []byte(key), nil})expect := []byte("key")if v, _ := f.Get("key"); !reflect.DeepEqual(v, expect) {t.Fatal("callback failed")}}func TestGet(t *testing.T) {loadCounts := make(map[string]int, len(db))gee := NewGroup("scores", 2<<10, GetterFunc(func(key string) ([]byte, error) {log.Println("[SlowDB] search key", key)if v, ok := db[key]; ok {if _, ok := loadCounts[key]; !ok {loadCounts[key] = 0}loadCounts[key]++return []byte(v), nil}return nil, fmt.Errorf("%s not exist", key)}))for k, v := range db {if view, err := gee.Get(k); err != nil || view.String() != v {t.Fatal("failed to get value of Tom")}if _, err := gee.Get(k); err != nil || loadCounts[k] > 1 {t.Fatalf("cache %s miss", k)}}if view, err := gee.Get("unknown"); err == nil {t.Fatalf("the value of unknow should be empty, but %s got", view)}}func TestGetGroup(t *testing.T) {groupName := "scores"NewGroup(groupName, 2<<10, GetterFunc(func(key string) (bytes []byte, err error) { return }))if group := GetGroup(groupName); group == nil || group.name != groupName {t.Fatalf("group %s not exist", groupName)}if group := GetGroup(groupName + "111"); group != nil {t.Fatalf("expect nil, but %s got", group.name)}}