> 文章列表 > etcd启动过程梳理

etcd启动过程梳理

etcd启动过程梳理


源码地址:https://github.com/etcd-io/etcd


1. 入口

scripts/build.sh
#!/usr/bin/env bash# This scripts build the etcd binaries
# To build the tools, run `build_tools.sh`source ./scripts/test_lib.sh
source ./scripts/build_lib.sh# only build when called directly, not sourced
if echo "$0" | grep -E "build(.sh)?$" >/dev/null; thenrun_build etcd_build
fi

scripts/build_lib.sh

run_build() {echo Running "$1"if $1; thenlog_success "SUCCESS: $1 (GOARCH=${GOARCH})"elselog_error "FAIL: $1 (GOARCH=${GOARCH})"exit 2fi
}etcd_build() {out="bin"if [[ -n "${BINDIR}" ]]; then out="${BINDIR}"; firun rm -f "${out}/etcd"(cd ./server# Static compilation is useful when etcd is run in a container. $GO_BUILD_FLAGS is OK# shellcheck disable=SC2086run env "${GO_BUILD_ENV[@]}" go build $GO_BUILD_FLAGS \\-trimpath \\-installsuffix=cgo \\"-ldflags=${GO_LDFLAGS[*]}" \\-o="../${out}/etcd" . || return 2) || return 2run rm -f "${out}/etcdutl"# shellcheck disable=SC2086(cd ./etcdutlrun env GO_BUILD_FLAGS="${GO_BUILD_FLAGS}" "${GO_BUILD_ENV[@]}" go build $GO_BUILD_FLAGS \\-trimpath \\-installsuffix=cgo \\"-ldflags=${GO_LDFLAGS[*]}" \\-o="../${out}/etcdutl" . || return 2) || return 2run rm -f "${out}/etcdctl"}

可以看到项目的启动文件在./server下

2. 启动过程

server/main.go

func main() {etcdmain.Main(os.Args)
}

etcdmain/main.go

func Main(args []string) {checkSupportArch()if len(args) > 1 {cmd := args[1]switch cmd {case "gateway", "grpc-proxy":if err := rootCmd.Execute(); err != nil {fmt.Fprint(os.Stderr, err)os.Exit(1)}return}}startEtcdOrProxyV2(args)
}

如果传递了参数 可以以proxy启动

func startEtcdOrProxyV2(args []string) {grpc.EnableTracing = falsecfg := newConfig()// 根据配置初始化日志 集群信息等内容defaultInitialCluster := cfg.ec.InitialCluster........var stopped <-chan struct{}var errc <-chan error// 对数据存储目录做了校验 禁止proxy和data同时配置which := identifyDataDirOrDie(cfg.ec.GetLogger(), cfg.ec.Dir)if which != dirEmpty {lg.Info("server has already been initialized",zap.String("data-dir", cfg.ec.Dir),zap.String("dir-type", string(which)),)switch which {case dirMember:// 启动etcdstopped, errc, err = startEtcd(&cfg.ec)case dirProxy:lg.Panic("v2 http proxy has already been deprecated in 3.6", zap.String("dir-type", string(which)))default:lg.Panic("unknown directory type",zap.String("dir-type", string(which)),)}} else {lg.Info("Initialize and start etcd server",zap.String("data-dir", cfg.ec.Dir),zap.String("dir-type", string(which)),)stopped, errc, err = startEtcd(&cfg.ec)}// 对异常进行处理...osutil.Exit(0)
}

初始化完成后,开始正式进入启动流程

etcdmain/etcd.go

// startEtcd除了运行独立etcd所需的钩子外,还运行startEtcd。
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {e, err := embed.StartEtcd(cfg)if err != nil {return nil, nil, err}osutil.RegisterInterruptHandler(e.Close)select {case <-e.Server.ReadyNotify(): // wait for e.Server to join the clustercase <-e.Server.StopNotify(): // publish aborted from 'ErrStopped'case <-time.After(cfg.ExperimentalWaitClusterReadyTimeout):e.GetLogger().Warn("startEtcd: timed out waiting for the ready notification")}return e.Server.StopNotify(), e.Err(), nil
}

embed/etcd.go

// StartEtcd启动etcd服务器和HTTP处理程序,用于客户端/服务器通信。返回的Etcd.Server不能保证已加入群集。等待Etcd.Server.ReadyNotify()通道,以了解它何时完成并准备好使用。
func StartEtcd(inCfg *Config) (e *Etcd, err error) {// 校验配置if err = inCfg.Validate(); err != nil {return nil, err}serving := falsee = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}cfg := &e.cfgdefer func() {if e == nil || err == nil {return}if !serving {// errored before starting gRPC server for serveCtx.serversCfor _, sctx := range e.sctxs {close(sctx.serversC)}}e.Close()e = nil}()if !cfg.SocketOpts.Empty() {cfg.logger.Info("configuring socket options",zap.Bool("reuse-address", cfg.SocketOpts.ReuseAddress),zap.Bool("reuse-port", cfg.SocketOpts.ReusePort),)}e.cfg.logger.Info("configuring peer listeners",zap.Strings("listen-peer-urls", e.cfg.getLPURLs()),)// 开始监听其他member发送消息的监听,即2380if e.Peers, err = configurePeerListeners(cfg); err != nil {return e, err}e.cfg.logger.Info("configuring client listeners",zap.Strings("listen-client-urls", e.cfg.getLCURLs()),)// 开启client服务监听  即默认的2379if e.sctxs, err = configureClientListeners(cfg); err != nil {return e, err}for _, sctx := range e.sctxs {e.Clients = append(e.Clients, sctx.l)}var (urlsmap types.URLsMaptoken   string)memberInitialized := true// 通过wal文件是否存在 来判断是否进行过初始化if !isMemberInitialized(cfg) {memberInitialized = falseurlsmap, token, err = cfg.PeerURLsMapAndToken("etcd")if err != nil {return e, fmt.Errorf("error setting up initial cluster: %v", err)}}// AutoCompactionRetention defaults to "0" if not set.if len(cfg.AutoCompactionRetention) == 0 {cfg.AutoCompactionRetention = "0"}// 获取自动压缩版本的周期 mode: Revision根据配置获得 Periodic 配置时间*1 hourautoCompactionRetention, err := parseCompactionRetention(cfg.AutoCompactionMode, cfg.AutoCompactionRetention)if err != nil {return e, err}// boltdb 后端存储方式 数组/map  这个应该是对应阿里之前的boltdb优化分享backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType)srvcfg := config.ServerConfig{....}if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {return e, err}// buffer channel so goroutines on closed connections won't wait forevere.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))// newly started member ("memberInitialized==false")// does not need corruption checkif memberInitialized && srvcfg.InitialCorruptCheck {if err = e.Server.CorruptionChecker().InitialCheck(); err != nil {// set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"// (nothing to close since rafthttp transports have not been started)e.cfg.logger.Error("checkInitialHashKV failed", zap.Error(err))e.Server.Cleanup()e.Server = nilreturn e, err}}e.Server.Start()if err = e.servePeers(); err != nil {return e, err}if err = e.serveClients(); err != nil {return e, err}if err = e.serveMetrics(); err != nil {return e, err}e.cfg.logger.Info("now serving peer/client/metrics",zap.String("local-member-id", e.Server.MemberId().String()),zap.Strings("initial-advertise-peer-urls", e.cfg.getAPURLs()),zap.Strings("listen-peer-urls", e.cfg.getLPURLs()),zap.Strings("advertise-client-urls", e.cfg.getACURLs()),zap.Strings("listen-client-urls", e.cfg.getLCURLs()),zap.Strings("listen-metrics-urls", e.cfg.getMetricsURLs()),)serving = truereturn e, nil
}

对配置参数转换 并开启监听 然后开始etcdserver.NewServer 分析新建Server的过程

server/etcdserver/server.go

// NewServer根据提供的配置创建一个新的EtcdServer。在EtcdServer的生命周期内,配置被认为是静态的
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {b, err := bootstrap(cfg)if err != nil {return nil, err}defer func() {if err != nil {b.Close()}}()sstats := stats.NewServerStats(cfg.Name, b.cluster.cl.String())lstats := stats.NewLeaderStats(cfg.Logger, b.cluster.nodeID.String())heartbeat := time.Duration(cfg.TickMs) * time.Millisecondsrv = &EtcdServer{readych:               make(chan struct{}),Cfg:                   cfg,lgMu:                  new(sync.RWMutex),lg:                    cfg.Logger,errorc:                make(chan error, 1),v2store:               b.storage.st,snapshotter:           b.ss,r:                     *b.raft.newRaftNode(b.ss, b.storage.wal.w, b.cluster.cl),memberId:              b.cluster.nodeID,attributes:            membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},cluster:               b.cluster.cl,stats:                 sstats,lstats:                lstats,SyncTicker:            time.NewTicker(500 * time.Millisecond),peerRt:                b.prt,reqIDGen:              idutil.NewGenerator(uint16(b.cluster.nodeID), time.Now()),AccessController:      &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},consistIndex:          b.storage.backend.ci,firstCommitInTerm:     notify.NewNotifier(),clusterVersionChanged: notify.NewNotifier(),}serverID.With(prometheus.Labels{"server_id": b.cluster.nodeID.String()}).Set(1)srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged)srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)srv.be = b.storage.backend.besrv.beHooks = b.storage.backend.beHooksminTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.srv.lessor = lease.NewLessor(srv.Logger(), srv.be, srv.cluster, lease.LessorConfig{MinLeaseTTL:                int64(math.Ceil(minTTL.Seconds())),CheckpointInterval:         cfg.LeaseCheckpointInterval,CheckpointPersist:          cfg.LeaseCheckpointPersist,ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),})tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,func(index uint64) <-chan struct{} {return srv.applyWait.Wait(index)},time.Duration(cfg.TokenTTL)*time.Second,)if err != nil {cfg.Logger.Warn("failed to create token provider", zap.Error(err))return nil, err}mvccStoreConfig := mvcc.StoreConfig{CompactionBatchLimit:    cfg.CompactionBatchLimit,CompactionSleepInterval: cfg.CompactionSleepInterval,}srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)srv.corruptionChecker = newCorruptionChecker(cfg.Logger, srv, srv.kv.HashStorage())srv.authStore = auth.NewAuthStore(srv.Logger(), schema.NewAuthBackend(srv.Logger(), srv.be), tp, int(cfg.BcryptCost))newSrv := srv // since srv == nil in defer if srv is returned as nildefer func() {// closing backend without first closing kv can cause// resumed compactions to fail with closed tx errorsif err != nil {newSrv.kv.Close()}}()if num := cfg.AutoCompactionRetention; num != 0 {srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)if err != nil {return nil, err}srv.compactor.Run()}if err = srv.restoreAlarms(); err != nil {return nil, err}srv.uberApply = srv.NewUberApplier()if srv.Cfg.EnableLeaseCheckpoint {// setting checkpointer enables lease checkpoint feature.srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp})})}// Set the hook after EtcdServer finishes the initialization to avoid// the hook being called during the initialization process.srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook())// TODO: move transport initialization near the definition of remotetr := &rafthttp.Transport{Logger:      cfg.Logger,TLSInfo:     cfg.PeerTLSInfo,DialTimeout: cfg.PeerDialTimeout(),ID:          b.cluster.nodeID,URLs:        cfg.PeerURLs,ClusterID:   b.cluster.cl.ID(),Raft:        srv,Snapshotter: b.ss,ServerStats: sstats,LeaderStats: lstats,ErrorC:      srv.errorc,}if err = tr.Start(); err != nil {return nil, err}// add all remotes into transportfor _, m := range b.cluster.remotes {if m.ID != b.cluster.nodeID {tr.AddRemote(m.ID, m.PeerURLs)}}for _, m := range b.cluster.cl.Members() {if m.ID != b.cluster.nodeID {tr.AddPeer(m.ID, m.PeerURLs)}}srv.r.transport = trreturn srv, nil
}

创建server的大体流程如下:

  • 存储数据的初始化
  • 配置server的参数
  • 新建mvcc服务 (todo 具体分析)
  • 启动transport服务
func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {if cfg.MaxRequestBytes > recommendedMaxRequestBytes {cfg.Logger.Warn("exceeded recommended request limit",zap.Uint("max-request-bytes", cfg.MaxRequestBytes),zap.String("max-request-size", humanize.Bytes(uint64(cfg.MaxRequestBytes))),zap.Int("recommended-request-bytes", recommendedMaxRequestBytes),zap.String("recommended-request-size", recommendedMaxRequestBytesString),)}// 新建存储目录,快照目录 后端存储if terr := fileutil.TouchDirAll(cfg.Logger, cfg.DataDir); terr != nil {return nil, fmt.Errorf("cannot access data directory: %v", terr)}if terr := fileutil.TouchDirAll(cfg.Logger, cfg.MemberDir()); terr != nil {return nil, fmt.Errorf("cannot access member directory: %v", terr)}ss := bootstrapSnapshot(cfg)prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.PeerDialTimeout())if err != nil {return nil, err}haveWAL := wal.Exist(cfg.WALDir())st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)backend, err := bootstrapBackend(cfg, haveWAL, st, ss)if err != nil {return nil, err}var bwal *bootstrappedWALif haveWAL {if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {return nil, fmt.Errorf("cannot write to WAL directory: %v", err)}bwal = bootstrapWALFromSnapshot(cfg, backend.snapshot)}cluster, err := bootstrapCluster(cfg, bwal, prt)if err != nil {backend.Close()return nil, err}s, err := bootstrapStorage(cfg, st, backend, bwal, cluster)if err != nil {backend.Close()return nil, err}if err = cluster.Finalize(cfg, s); err != nil {backend.Close()return nil, err}raft := bootstrapRaft(cfg, cluster, s.wal)return &bootstrappedServer{prt:     prt,ss:      ss,storage: s,cluster: cluster,raft:    raft,}, nil
}