> 文章列表 > GO实现Redis:GO实现Redis的AOF持久化(4)

GO实现Redis:GO实现Redis的AOF持久化(4)

GO实现Redis:GO实现Redis的AOF持久化(4)

  • 将用户发来的指令以RESP协议的形式存储在本地的AOF文件,重启Redis后执行此文件恢复数据
  • https://github.com/csgopher/go-redis
  • 本文涉及以下文件:
    redis.conf:配置文件
    aof:实现aof

redis.conf

appendonly yes
appendfilename appendonly.aof

aof/aof.go

type CmdLine = [][]byteconst (aofQueueSize = 1 << 16
)type payload struct {cmdLine CmdLinedbIndex int
}type AofHandler struct {db          databaseface.DatabaseaofChan     chan *payloadaofFile     *os.FileaofFilename stringcurrentDB   int
}func NewAOFHandler(db databaseface.Database) (*AofHandler, error) {handler := &AofHandler{}handler.aofFilename = config.Properties.AppendFilenamehandler.db = dbhandler.LoadAof()aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)if err != nil {return nil, err}handler.aofFile = aofFilehandler.aofChan = make(chan *payload, aofQueueSize)go func() {handler.handleAof()}()return handler, nil
}func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) {if config.Properties.AppendOnly && handler.aofChan != nil {handler.aofChan <- &payload{cmdLine: cmdLine,dbIndex: dbIndex,}}
}func (handler *AofHandler) handleAof() {handler.currentDB = 0for p := range handler.aofChan {if p.dbIndex != handler.currentDB {// select dbdata := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()_, err := handler.aofFile.Write(data)if err != nil {logger.Warn(err)continue}handler.currentDB = p.dbIndex}data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes()_, err := handler.aofFile.Write(data)if err != nil {logger.Warn(err)}}
}func (handler *AofHandler) LoadAof() {file, err := os.Open(handler.aofFilename)if err != nil {logger.Warn(err)return}defer file.Close()ch := parser.ParseStream(file)fakeConn := &connection.Connection{}for p := range ch {if p.Err != nil {if p.Err == io.EOF {break}logger.Error("parse error: " + p.Err.Error())continue}if p.Data == nil {logger.Error("empty payload")continue}r, ok := p.Data.(*reply.MultiBulkReply)if !ok {logger.Error("require multi bulk reply")continue}ret := handler.db.Exec(fakeConn, r.Args)if reply.IsErrorReply(ret) {logger.Error("exec err", err)}}
}

AofHandler:1.从管道中接收数据 2.写入AOF文件
AddAof:用户的指令包装成payload放入管道
handleAof:将管道中的payload写入磁盘
LoadAof:重启Redis后加载aof文件

database/database.go

type Database struct {dbSet []*DBaofHandler *aof.AofHandler
}func NewDatabase() *Database {mdb := &Database{}if config.Properties.Databases == 0 {config.Properties.Databases = 16}mdb.dbSet = make([]*DB, config.Properties.Databases)for i := range mdb.dbSet {singleDB := makeDB()singleDB.index = imdb.dbSet[i] = singleDB}if config.Properties.AppendOnly {aofHandler, err := aof.NewAOFHandler(mdb)if err != nil {panic(err)}mdb.aofHandler = aofHandlerfor _, db := range mdb.dbSet {singleDB := dbsingleDB.addAof = func(line CmdLine) {mdb.aofHandler.AddAof(singleDB.index, line)}}}return mdb
}

将AOF加入到database里
使用singleDB的原因:因为在循环中获取返回变量的地址都完全相同,因此当我们想要访问数组中元素所在的地址时,不应该直接获取 range 返回的变量地址 db,而应该使用 singleDB := db

database/db.go

type DB struct {index intdata   dict.DictaddAof func(CmdLine)
}func makeDB() *DB {db := &DB{data:   dict.MakeSyncDict(),addAof: func(line CmdLine) {},}return db
}

由于分数据库db引用不到aof,所以添加一个addAof匿名函数,在NewDatabase中用这个匿名函数调用AddAof

database/keys.go

func execDel(db *DB, args [][]byte) resp.Reply {......if deleted > 0 {db.addAof(utils.ToCmdLine2("del", args...))}return reply.MakeIntReply(int64(deleted))
}func execFlushDB(db *DB, args [][]byte) resp.Reply {db.Flush()db.addAof(utils.ToCmdLine2("flushdb", args...))return &reply.OkReply{}
}func execRename(db *DB, args [][]byte) resp.Reply {......db.addAof(utils.ToCmdLine2("rename", args...))return &reply.OkReply{}
}func execRenameNx(db *DB, args [][]byte) resp.Reply {......db.addAof(utils.ToCmdLine2("renamenx", args...))return reply.MakeIntReply(1)
}

database/string.go

func execSet(db *DB, args [][]byte) resp.Reply {......db.addAof(utils.ToCmdLine2("set", args...))return &reply.OkReply{}
}func execSetNX(db *DB, args [][]byte) resp.Reply {......db.addAof(utils.ToCmdLine2("setnx", args...))return reply.MakeIntReply(int64(result))
}func execGetSet(db *DB, args [][]byte) resp.Reply {key := string(args[0])value := args[1]entity, exists := db.GetEntity(key)db.PutEntity(key, &database.DataEntity{Data: value})db.addAof(utils.ToCmdLine2("getset", args...))......
}

添加addAof方法

测试命令

*3\\r\\n$3\\r\\nSET\\r\\n$3\\r\\nkey\\r\\n$5\\r\\nvalue\\r\\n
*2\\r\\n$3\\r\\nGET\\r\\n$3\\r\\nkey\\r\\n
*2\\r\\n$6\\r\\nSELECT\\r\\n$1\\r\\n1\\r\\n