【go项目-geecache】动手写分布式缓存 - day5 - 分布式节点
流程回顾
接收 key --> 检查是否被缓存 -----> 返回缓存值 ⑴
| 否 是
|-----> 是否应当从远程节点获取 -----> 与远程节点交互 --> 返回缓存值 ⑵
| 否
|-----> 调用回调函数
,获取值并添加到缓存 --> 返回缓存值 ⑶
我们在[GeeCache 第二天](【go项目-geecache】动手写分布式缓存 day2 - 单机并发缓存_CCSU__LRF的博客-CSDN博客) 中描述了 geecache 的流程。在这之前已经实现了流程 ⑴ 和 ⑶,今天实现流程 ⑵,从远程节点获取缓存值。
我们进一步细化流程 ⑵:
使用一致性哈希选择节点 是 是
|-----> 是否是远程节点 -----> HTTP 客户端访问远程节点 --> 成功?-----> 服务端返回返回值
| 否 ↓ 否
|----------------------------> 回退到本地节点处理。
现在我们来实现这个过程
什么是PeerPicker?
在P2P网络(对等网络)中,没有集中的服务器或中央控制器,而是由各个节点共同管理和控制网络。PeerPicker的作用是选择一个可用的节点,以便客户端可以向该节点发出请求并获取所需的数据
抽象PeerPicker peer.go
package geecache type PeerPicker interface { PickPeer(key string) (peer PeerGetter, ok bool)
} type PeerGetter interface { Get(group string, key string) ([]byte, error)
}
- 定义接口PeerPicker,传入一个key返回以个PeerGetter
- PeerGetter从group中获取数据
实现HTTP的客户端
在[Geecache第三天][https://blog.csdn.net/csxylrf/article/details/130222918?spm=1001.2014.3001.5502]我们实现了HTTPool的服务端功能,现在实现客户端功能,用于从Web服务器获取数据
实现HTTP 客户端类 httpGetter
type httpGetter struct { baseURL string
} func (h *httpGetter) Get(group string, key string) ([]byte, error) { u := fmt.Sprintf( "%v%v/%v", h.baseURL, url.QueryEscape(group), url.QueryEscape(key), ) res, err := http.Get(u) if err != nil { return nil, err } defer res.Body.Close() if res.StatusCode != http.StatusOK { return nil, fmt.Errorf("server returned: %v", res.Status) } bytes, err := ioutil.ReadAll(res.Body) if err != nil { return nil, fmt.Errorf("reading response body: %v", err) } return bytes, nil
} var _ PeerGetter = (*httpGetter)(nil)
heepGetter
结构体包含一个baseURL
,表示Web服务器的基本URL地址。- 为
heepGetter
实现一个get函数,传入group和key构造出url地址 - 再用http.get()访问URL ,获取状态,如果状态码不是200(即成功)则返回一个错误对象。
- httpGetter结构体从响应体中读取数据,将其转换为字节数组并返回。
最后一行代码 “var _ PeerGetter = (*httpGetter)(nil)” 是一个Go语言的接口赋值语句,表明httpGetter结构体实现了PeerGetter接口。这句代码的作用是确保httpGetter结构体实现了PeerGetter接口的所有方法,以便在编译时进行检查
实现HTTPPool添加节点选择的功能
const ( defaultBasePath = "/_geecache/" defaultReplicas = 50
)
type HTTPPool struct { self string basePath string mu sync.Mutex // guards peers and httpGetters peers *consistenthash.Map httpGetters map[string]*httpGetter // keyed by e.g.
}
peers
:类型是一致性哈希算法的Map
,用来根据具体的 key 选择节点httpGetters
:映射远程节点与对应的 httpGetter。每一个远程节点对应一个 httpGetter,因为 httpGetter 与远程节点的地址baseURL
有关。
实现HTTPPool的Set接口
func (p *HTTPPool) Set(peers ...string) { p.mu.Lock() defer p.mu.Unlock() p.peers = consistenthash.New(defaultReplicas, nil) p.peers.Add(peers...) p.httpGetters = make(map[string]*httpGetter, len(peers)) for _, peer := range peers { p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath} }
}
Set
接口 : 用于设置HTTPPool结构体的peers字段,即HTTPPool所要连接的节点地址。
- 方法首先获取HTTPPool结构体的互斥锁,以防止其他协程同时访问和修改peers字段。
- 然后,它创建一个 consistenthash.Hash 实例,使用默认的副本数 defaultReplicas 和 nil 的 hash 函数(使用默认的 CRC32)
- 并将传入的 peers 添加到 consistenthash.Hash 实例中。
- 接下来,方法创建一个 map,用于存储 httpGetter 类型的结构体,即要连接的节点信息
- 然后,方法遍历传递进来的可变参数 peers,将每个节点地址作为 key,新建一个 httpGetter 结构体并作为 value 存储到 map 中。
- 在新建 httpGetter 结构体时,使用 peer 地址和 HTTPPool 的 basePath 属性构造 httpGetter 结构体的 baseURL 字段,以便后续访问节点时使用。
实现HTTPPool的PickPeer接口
func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) { p.mu.Lock() defer p.mu.Unlock() if peer := p.peers.Get(key); peer != "" && peer != p.self { p.Log("Pick peer %s", peer) return p.httpGetters[peer], true } return nil, false
}
var _ PeerPicker = (*HTTPPool)(nil)
PickerPeer()
包装了一致性哈希算法的 Get()
方法,根据具体的 key,选择节点,返回节点对应的 HTTP 客户端
新增功能集成在geecache主流程上
type Group struct { name string getter Getter mainCache cache peers PeerPicker
} func (g *Group) RegisterPeers(peers PeerPicker) { if g.peers != nil { panic("RegisterPeerPicker called more than once") } g.peers = peers
} func (g *Group) load(key string) (value ByteView, err error) { if g.peers != nil { if peer, ok := g.peers.PickPeer(key); ok { if value, err = g.getFromPeer(peer, key); err == nil { return value, nil } log.Println("[GeeCache] Failed to get from peer", err) } } return g.getLocally(key)
} func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) { bytes, err := peer.Get(g.name, key) if err != nil { return ByteView{}, err } return ByteView{b: bytes}, nil
}
这段代码是一个缓存框架 GeeCache 中的一部分,其中定义了一个 Group 结构体,该结构体包含了使用缓存所需的各种信息和方法。
具体来说,Group 结构体包含以下字段:
- name:缓存组的名称。
- getter:缓存未命中时获取源数据的回调函数。
- mainCache:实现缓存的主要数据结构 cache。
- peers:实现多节点缓存的 PeerPicker 接口。
Group 结构体定义了两个方法,分别为 RegisterPeers 和 load。
RegisterPeers 方法用于注册 PeerPicker,即将多个节点的 PeerGetter 实例添加到缓存组中,以实现多节点缓存的功能。
load 方法用于从缓存中获取指定 key 的缓存数据。在获取数据时,首先会尝试从 peers 中的节点中获取数据,如果成功则返回数据,否则会从本地缓存中获取数据。如果本地缓存中也没有找到数据,则会调用 getter 回调函数获取源数据,并将源数据添加到缓存中。
在 load 方法中,如果 peers 不为 nil,表示已经注册了 PeerPicker,即启用了多节点缓存。在这种情况下,会通过调用 peers.PickPeer(key) 方法选择节点,再通过调用 getFromPeer 方法从指定节点获取数据。如果获取数据失败,则会打印失败信息,最终会从本地缓存中获取数据。
getFromPeer 方法用于从指定节点获取数据,首先通过调用 peer.Get(g.name, key) 方法获取数据,获取到数据后将其封装为 ByteView 类型并返回。如果获取数据失败,则会返回错误信息。
总之,这段代码实现了 GeeCache 缓存框架中的一些基本功能,包括多节点缓存、本地缓存和缓存数据的获取等。
main 函数测试
var db = map[string]string{ "Tom": "630", "Jack": "589", "Sam": "567",
} func createGroup() *geecache.Group { return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc( func(key string) ([]byte, error) { log.Println("[SlowDB] search key", key) if v, ok := db[key]; ok { return []byte(v), nil } return nil, fmt.Errorf("%s not exist", key) }))
} func startCacheServer(addr string, addrs []string, gee *geecache.Group) { peers := geecache.NewHTTPPool(addr) peers.Set(addrs...) gee.RegisterPeers(peers) log.Println("geecache is running at", addr) log.Fatal(http.ListenAndServe(addr[7:], peers))
} func startAPIServer(apiAddr string, gee *geecache.Group) { http.Handle("/api", http.HandlerFunc( func(w http.ResponseWriter, r *http.Request) { key := r.URL.Query().Get("key") view, err := gee.Get(key) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/octet-stream") w.Write(view.ByteSlice()) })) log.Println("fontend server is running at", apiAddr) log.Fatal(http.ListenAndServe(apiAddr[7:], nil)) } func main() { var port int var api bool flag.IntVar(&port, "port", 8001, "Geecache server port") flag.BoolVar(&api, "api", false, "Start a api server?") flag.Parse() apiAddr := "http://localhost:9999" addrMap := map[int]string{ 8001: "http://localhost:8001", 8002: "http://localhost:8002", 8003: "http://localhost:8003", } var addrs []string for _, v := range addrMap { addrs = append(addrs, v) } gee := createGroup() if api { go startAPIServer(apiAddr, gee) } startCacheServer(addrMap[port], []string(addrs), gee)
}
为了方便,我们将启动的命令封装为一个 shell
脚本:
#!/bin/bash
trap "rm server;kill 0" EXIT go build -o server
./server -port=8001 &
./server -port=8002 &
./server -port=8003 -api=1 & sleep 2
echo ">>> start test"
curl "http://localhost:9999/api?key=Tom" &
curl "http://localhost:9999/api?key=Tom" &
curl "http://localhost:9999/api?key=Tom" & wait