> 文章列表 > 【go项目-geecache】动手写分布式缓存 - day6 - 防止缓存击穿

【go项目-geecache】动手写分布式缓存 - day6 - 防止缓存击穿

【go项目-geecache】动手写分布式缓存 - day6 - 防止缓存击穿

收获

  1. 了解缓存穿透,缓存雪崩, 缓存击穿
  2. 知道怎么防止缓存击穿
  3. 了解匿名函数的使用
  4. 加深了锁的使用
  5. 了解空接口interface{}和模板的区别和作用

介绍缓存穿透,缓存雪崩, 缓存击穿

概念可以看我这篇博客缓存穿透,缓存雪崩,缓存击穿概念及解决方法
简单说一下缓存击穿,就是在缓存中找不到数据,导致直接访问数据库,降低效率

为什么我们要防止缓存击穿

当前我们所做的项目对数据库的访问没有做任何限制的,所以当客户端发起大量请求,很容易导致缓存击穿和穿透,所以我们需要减少重复请求

引入singleflight

singleflight是一种用于减少重复请求的技术,它可以避免在高并发场景下出现重复的请求
singleflight的实现方式是在请求前先检查是否已经有相同的请求正在处理,如果有,则等待该请求的处理结果并直接返回,避免重复发起请求。

实现singleflight数据结构

package singleflight  import "sync"  type call struct {  wg  sync.WaitGroup  val interface{}  err error  
}  type Group struct {  mu sync.Mutex       // protects m  m  map[string]*call  
}
  • call:表示正在进行中,或已经完成的函数调用,其中wg字段sync.WaitGroup 锁避免冲突,val字段是一个空接口,err字段表示函数调用是否发生了错误。
  • Group:表示一组正在进行中的函数调用,其中mu字段是一个互斥锁,用于保护m字段的读写,m字段是一个map,用于存储正在进行中的函数调用

这里我对sync.WaitGroupinterface{} 空接口解释一下:

sync.WaitGroup

用于等待一组goroutine执行完成。主要用于在主goroutine等待一组子goroutine执行完毕后再继续执行的场景。

interface{} 空接口

我们知道接口时一组方法的集合,接口类型定义了一组方法,但没有实现这些方法。空接口里面没有方法,意味着它可以表示任意类型的值,可以方便地实现通用性较强的函数或数据结构

空接口和模板的区别

  • 空接口的类型判断需要通过断言,而模板的数据类型是利用编译器进行类型推导来获得的
  • 模板在使用时需要在编译阶段进行类型检查,因此可以保证类型安全性

实现singleflight的Do方法

func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {  g.mu.Lock()  if g.m == nil {  g.m = make(map[string]*call)  }  if c, ok := g.m[key]; ok {  g.mu.Unlock()  c.wg.Wait()  return c.val, c.err  }  c := new(call)  c.wg.Add(1)  g.m[key] = c  g.mu.Unlock()  c.val, c.err = fn()  c.wg.Done()  g.mu.Lock()  delete(g.m, key)  g.mu.Unlock()  return c.val, c.err  
}
  • Do() : 这个函数传入key和函数fn,只要key相同,那么fn函数指挥运行一次
  1. 首先mutex锁,防止g.m并发读写冲突
  2. 检查group的m成员变量是否存在,不存在则初始化
  3. 检查key的请求是否正在执行,如果正在执行,则等待结果并解锁,等待结果后返回结果
  4. 否则如果没有在执行,新建一个call类型表示正在执行
  5. 发起请求前加锁
  6. 将这个call加入到group的m(map)中,表示正在执行
  7. 调用fn
  8. c.wg.Done() 表示请求结束
  9. group的m(map)中从删除call

把singleflight加入主进程group.go

type Group struct {  name      string  getter    Getter  mainCache cache  peers     PeerPicker  loader *singleflight.Group  
}  func NewGroup(name string, cacheBytes int64, getter Getter) *Group {  g := &Group{  loader:    &singleflight.Group{},  }  return g  
}  func (g *Group) load(key string) (value ByteView, err error) {  viewi, err := g.loader.Do(key, func() (interface{}, error) {  if g.peers != nil {  if peer, ok := g.peers.PickPeer(key); ok {  if value, err = g.getFromPeer(peer, key); err == nil {  return value, nil  }  log.Println("[GeeCache] Failed to get from peer", err)  }  }  return g.getLocally(key)  })  if err == nil {  return viewi.(ByteView), nil  }  return  
}
  • 添加Group结构体loader,更新NewGroup
  • 修改load函数,使用匿名函数保证对于同一个key只会执行一次

测试

$ ./run.sh  
2020/02/16 22:36:00 [Server http://localhost:8003] Pick peer http://localhost:8001  
2020/02/16 22:36:00 [Server http://localhost:8001] GET /_geecache/scores/Tom  
2020/02/16 22:36:00 [SlowDB] search key Tom  
630630630

更新后的group.go

// 负责与外部交互,控制缓存存储和获取的主流程
package geecacheimport ("fmt""geecache/singleflight""log""sync"
)type Group struct {name      stringgetter    GettermainCache cachepeers     PeerPickerloader *singleflight.Group
}type Getter interface {Get(key string) ([]byte, error)
}type GetterFunc func(key string) ([]byte, error)func (f GetterFunc) Get(key string) ([]byte, error) {return f(key)
}var (mu     sync.RWMutexgroups = make(map[string]*Group)
)func NewGroup(name string, cacheBytes int64, getter Getter) *Group {if getter == nil {panic("nil Getter")}mu.Lock()defer mu.Unlock()g := &Group{name:      name,getter:    getter,mainCache: cache{cacheBytes: cacheBytes},loader:    &singleflight.Group{},}groups[name] = greturn g
}func GetGroup(name string) *Group {mu.RLock()g := groups[name]mu.RUnlock()return g
}func (g *Group) Get(key string) (ByteView, error) {if key == "" {return ByteView{}, fmt.Errorf("key is required")}if v, ok := g.mainCache.get(key); ok {log.Println("[GeeCache] hit")return v, nil}return g.load(key)
}func (g *Group) getLocally(key string) (ByteView, error) {bytes, err := g.getter.Get(key)if err != nil {return ByteView{}, err}value := ByteView{b: cloneBytes(bytes)}g.populateCache(key, value)return value, nil
}func (g *Group) populateCache(key string, value ByteView) {g.mainCache.add(key, value)
}
func (g *Group) RegisterPeers(peers PeerPicker) {if g.peers != nil {panic("RegisterPeerPicker called more than once")}g.peers = peers
}func (g *Group) load(key string) (value ByteView, err error) {// each key is only fetched once (either locally or remotely)// regardless of the number of concurrent callers.viewi, err := g.loader.Do(key, func() (interface{}, error) {if g.peers != nil {if peer, ok := g.peers.PickPeer(key); ok {if value, err = g.getFromPeer(peer, key); err == nil {return value, nil}log.Println("[GeeCache] Failed to get from peer", err)}}return g.getLocally(key)})if err == nil {return viewi.(ByteView), nil}return
}func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {bytes, err := peer.Get(g.name, key)if err != nil {return ByteView{}, err}return ByteView{b: bytes}, nil
}