> 文章列表 > Zookeeper源码分析——ZK服务端加载数据源码解析

Zookeeper源码分析——ZK服务端加载数据源码解析

Zookeeper源码分析——ZK服务端加载数据源码解析

ZK服务端加载数据源码解析

Zookeeper源码分析——ZK服务端加载数据源码解析

(1)zk 中的数据模型,是一棵树,DataTree,每个节点,叫做DataNode
(2)zk 集群中的DataTree 时刻保持状态同步
(3)Zookeeper 集群中每个zk 节点中,数据在内存和磁盘中都有一份完整的数据。
⚫ 内存数据:DataTree
⚫ 磁盘数据:快照文件 + 编辑日志

ZK服务端初始化源码解析

Zookeeper源码分析——ZK服务端加载数据源码解析

启动集群

public void runFromConfig(QuorumPeerConfig config)throws IOException, AdminServerException{try {ManagedUtil.registerLog4jMBeans();} catch (JMException e) {LOG.warn("Unable to register log4j JMX control", e);}LOG.info("Starting quorum peer");try {ServerCnxnFactory cnxnFactory = null;ServerCnxnFactory secureCnxnFactory = null;// 通信组件初始化,默认是 NIO通信if (config.getClientPortAddress() != null) {cnxnFactory = ServerCnxnFactory.createFactory();cnxnFactory.configure(config.getClientPortAddress(),config.getMaxClientCnxns(),false);}if (config.getSecureClientPortAddress() != null) {secureCnxnFactory = ServerCnxnFactory.createFactory();secureCnxnFactory.configure(config.getSecureClientPortAddress(),config.getMaxClientCnxns(),true);}// 把解析的参数赋值给该 zookeeper节点quorumPeer = getQuorumPeer();quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(),config.getDataDir()));quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());//quorumPeer.setQuorumPeers(config.getAllMembers());quorumPeer.setElectionType(config.getElectionAlg());quorumPeer.setMyid(config.getServerId());quorumPeer.setTickTime(config.getTickTime());quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());quorumPeer.setInitLimit(config.getInitLimit());quorumPeer.setSyncLimit(config.getSyncLimit());quorumPeer.setConfigFileName(config.getConfigFilename());// 管理 zk数据的存储quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);if (config.getLastSeenQuorumVerifier()!=null) {quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);}quorumPeer.initConfigInZKDatabase();// 管理 zk的通信quorumPeer.setCnxnFactory(cnxnFactory);quorumPeer.setSecureCnxnFactory(secureCnxnFactory);quorumPeer.setSslQuorum(config.isSslQuorum());quorumPeer.setUsePortUnification(config.shouldUsePortUnification());quorumPeer.setLearnerType(config.getPeerType());quorumPeer.setSyncEnabled(config.getSyncEnabled());quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());if (config.sslQuorumReloadCertFiles) {quorumPeer.getX509Util().enableCertFileReloading();}// sets quorum sasl authentication configurationsquorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);if(quorumPeer.isQuorumSaslAuthEnabled()){quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);}quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);quorumPeer.initialize();// 启动 zkquorumPeer.start();quorumPeer.join();} catch (InterruptedException e) {// warn, but generally this is okLOG.warn("Quorum Peer interrupted", e);}}

冷启动恢复数据

QuorumPeer.java

public synchronized void start() {if (!getView().containsKey(myid)) {throw new RuntimeException("My id " + myid + " not in the peer list");}// 冷启动数据恢复loadDataBase();startServerCnxnFactory();try {// 启动通信工厂实例对象adminServer.start();} catch (AdminServerException e) {LOG.warn("Problem starting AdminServer", e);System.out.println(e);}// 准备选举环境startLeaderElection();// 执行选举super.start();}

查看数据恢复的具体方法

private void loadDataBase() {try {// 加载磁盘数据到内存,恢复 DataTree // zk的操作分两种:事务操作和非事务操作// 事务操作 zk.cteate();都会被分配一个全局唯一的 zxid zxid组成: 64位:
(前 32位: epoch每个 leader任期的代号;后 32位: txid为事务 id// 非事务操作 zk.getData() // 数据恢复过程:// 1)从 快照文件 中恢复大部分数据,并得到一个 lastProcessZXid // 2 再 从编辑日志中执行 replay,执行到最后一条日志并更新 lastProcessZXid // 3)最终得到 datatree和 lastProcessZXid,表示数据恢复完成zkDb.loadDataBase();// load the epochslong lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);try {currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);} catch(FileNotFoundException e) {// pick a reasonable epoch number// this should only happen once when moving to a// new code versioncurrentEpoch = epochOfZxid;LOG.info(CURRENT_EPOCH_FILENAME+ " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",currentEpoch);writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);}if (epochOfZxid > currentEpoch) {throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);}try {acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);} catch(FileNotFoundException e) {// pick a reasonable epoch number// this should only happen once when moving to a// new code versionacceptedEpoch = epochOfZxid;LOG.info(ACCEPTED_EPOCH_FILENAME+ " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",acceptedEpoch);writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);}if (acceptedEpoch < currentEpoch) {throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(currentEpoch));}} catch(IOException ie) {LOG.error("Unable to load database on disk", ie);throw new RuntimeException("Unable to run quorum server ", ie);}}

加载磁盘数据到内存,恢复 DataTree的具体方法

Zookeeper源码分析——ZK服务端加载数据源码解析

点击具体的restore方法

/* this function restores the server* database after reading from the* snapshots and transaction logs* @param dt the datatree to be restored* @param sessions the sessions to be restored* @param listener the playback listener to run on the* database restoration* @return the highest zxid restored* @throws IOException*/public long restore(DataTree dt, Map<Long, Integer> sessions,PlayBackListener listener) throws IOException {// 恢复 快照文件 数据到 DataTreelong deserializeResult = snapLog.deserialize(dt, sessions);FileTxnLog txnLog = new FileTxnLog(dataDir);RestoreFinalizer finalizer = () -> {// 恢复 编辑日志 数据到 DataTreelong highestZxid = fastForwardFromEdits(dt, sessions, listener);return highestZxid;};if (-1L == deserializeResult) {/* this means that we couldn't find any snapshot, so we need to* initialize an empty database (reported in ZOOKEEPER-2325) */if (txnLog.getLastLoggedZxid() != -1) {// ZOOKEEPER-3056: provides an escape hatch for users upgrading// from old versions of zookeeper (3.4.x, pre 3.5.3).if (!trustEmptySnapshot) {throw new IOException(EMPTY_SNAPSHOT_WARNING + "Something is broken!");} else {LOG.warn("{}This should only be allowed during upgrading.", EMPTY_SNAPSHOT_WARNING);return finalizer.run();}}/* TODO: (br33d) we should either put a ConcurrentHashMap on restore()*       or use Map on save() */save(dt, (ConcurrentHashMap<Long, Integer>)sessions);/* return a zxid of zero, since we the database is empty */return 0;}return finalizer.run();}

查看具体实现反序列化的方法

*** deserialize a data tree from the most recent snapshot* @return the zxid of the snapshot*/public long deserialize(DataTree dt, Map<Long, Integer> sessions)throws IOException {// we run through 100 snapshots (not all of them)// if we cannot get it running within 100 snapshots// we should  give upList<File> snapList = findNValidSnapshots(100);if (snapList.size() == 0) {return -1L;}File snap = null;boolean foundValid = false;// 依次遍历每一个快照的数据for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {snap = snapList.get(i);LOG.info("Reading snapshot " + snap);// 反序列化 环境准备try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) {InputArchive ia = BinaryInputArchive.getArchive(crcIn);// 反序列化,恢复数据到 DataTreedeserialize(dt, sessions, ia);long checkSum = crcIn.getChecksum().getValue();long val = ia.readLong("val");if (val != checkSum) {throw new IOException("CRC corruption in snapshot :  " + snap);}foundValid = true;break;} catch (IOException e) {LOG.warn("problem reading snap file " + snap, e);}}if (!foundValid) {throw new IOException("Not able to find valid snapshots in " + snapDir);}dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);return dt.lastProcessedZxid;}/* deserialize the datatree from an inputarchive* @param dt the datatree to be serialized into* @param sessions the sessions to be filled up* @param ia the input archive to restore from* @throws IOException*/public void deserialize(DataTree dt, Map<Long, Integer> sessions,InputArchive ia) throws IOException {FileHeader header = new FileHeader();header.deserialize(ia, "fileheader");if (header.getMagic() != SNAP_MAGIC) {throw new IOException("mismatching magic headers "+ header.getMagic() +" !=  " + FileSnap.SNAP_MAGIC);}// 恢复快照数据到 DataTreeSerializeUtils.deserializeSnapshot(dt,ia,sessions);}public static void deserializeSnapshot(DataTree dt,InputArchive ia,Map<Long, Integer> sessions) throws IOException {int count = ia.readInt("count");while (count > 0) {long id = ia.readLong("id");int to = ia.readInt("timeout");sessions.put(id, to);if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,"loadData --- session in archive: " + id+ " with timeout: " + to);}count--;}// 恢复快照数据到 DataTreedt.deserialize(ia, "tree");}public void deserialize(InputArchive ia, String tag) throws IOException {aclCache.deserialize(ia);nodes.clear();pTrie.clear();String path = ia.readString("path");// 从快照中恢复每一个 datanode节点数据到 DataTreewhile (!"/".equals(path)) {// 每次循环创建一个节点对象DataNode node = new DataNode();ia.readRecord(node, "node");// 将 DataNode恢复到 DataTreenodes.put(path, node);synchronized (node) {aclCache.addUsage(node.acl);}int lastSlash = path.lastIndexOf('/');if (lastSlash == -1) {root = node;} else {// 处理父节点String parentPath = path.substring(0, lastSlash);DataNode parent = nodes.get(parentPath);if (parent == null) {throw new IOException("Invalid Datatree, unable to find " +"parent " + parentPath + " of path " + path);}// 处理子节点parent.addChild(path.substring(lastSlash + 1));// 处理临时节点和永久节点long eowner = node.stat.getEphemeralOwner();EphemeralType ephemeralType = EphemeralType.get(eowner);if (ephemeralType == EphemeralType.CONTAINER) {containers.add(path);} else if (ephemeralType == EphemeralType.TTL) {ttls.add(path);} else if (eowner != 0) {HashSet<String> list = ephemerals.get(eowner);if (list == null) {list = new HashSet<String>();ephemerals.put(eowner, list);}list.add(path);}}path = ia.readString("path");}nodes.put("/", root);// we are done with deserializing the// the datatree// update the quotas - create path trie// and also update the stat nodessetupQuota();aclCache.purgeUnused();}

冷启动数据恢复编辑日志

回到FileTxnSnapLog.java类中的 restore方法

public long restore(DataTree dt, Map<Long, Integer> sessions,PlayBackListener listener) throws IOException {// 恢复快照文件数据到 DataTreelong deserializeResult = snapLog.deserialize(dt, sessions);FileTxnLog txnLog = new FileTxnLog(dataDir);RestoreFinalizer finalizer = () -> {// 恢复 编辑日志 数据到 DataTreelong highestZxid = fastForwardFromEdits(dt, sessions, listener);return highestZxid;};if (-1L == deserializeResult) {/* this means that we couldn't find any snapshot, so we need to* initialize an empty database (reported in ZOOKEEPER-2325) */if (txnLog.getLastLoggedZxid() != -1) {// ZOOKEEPER-3056: provides an escape hatch for users upgrading// from old versions of zookeeper (3.4.x, pre 3.5.3).if (!trustEmptySnapshot) {throw new IOException(EMPTY_SNAPSHOT_WARNING + "Something is broken!");} else {LOG.warn("{}This should only be allowed during upgrading.", EMPTY_SNAPSHOT_WARNING);return finalizer.run();}}/* TODO: (br33d) we should either put a ConcurrentHashMap on restore()*       or use Map on save() */save(dt, (ConcurrentHashMap<Long, Integer>)sessions);/* return a zxid of zero, since we the database is empty */return 0;}return finalizer.run();}public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,PlayBackListener listener) throws IOException {// 在此之前,已经从快照文件中恢复了大部分数据,接下来只需从快照的 zxid + 1位置开始恢复TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);// 快照中最大的 zxid,在执行编辑日志时,这个值会不断更新,直到所有操作执行完long highestZxid = dt.lastProcessedZxid;TxnHeader hdr;try {// 从 lastProcessedZxid事务编号器开始,不断的从编辑日志中恢复剩下的还没有恢复的数据while (true) {// iterator points to// the first valid txn when initialized// 获取事务头信息(有 zxidhdr = itr.getHeader();if (hdr == null) {//empty logsreturn dt.lastProcessedZxid;}if (hdr.getZxid() < highestZxid && highestZxid != 0) {LOG.error("{}(highestZxid) > {}(next log) for type {}",highestZxid, hdr.getZxid(), hdr.getType());} else {highestZxid = hdr.getZxid();}try {// 根据编辑日志恢复数据到 DataTree 每 执行一次,对应的事务 idhighestZxid + 1processTransaction(hdr,dt,sessions, itr.getTxn());} catch(KeeperException.NoNodeException e) {throw new IOException("Failed to process transaction type: " +hdr.getType() + " error: " + e.getMessage(), e);}listener.onTxnLoaded(hdr, itr.getTxn());if (!itr.next())break;}} finally {if (itr != null) {itr.close();}}return highestZxid;}public void processTransaction(TxnHeader hdr,DataTree dt,Map<Long, Integer> sessions, Record txn)throws KeeperException.NoNodeException {ProcessTxnResult rc;switch (hdr.getType()) {case OpCode.createSession:sessions.put(hdr.getClientId(),((CreateSessionTxn) txn).getTimeOut());if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,"playLog --- create session in log: 0x"+ Long.toHexString(hdr.getClientId())+ " with timeout: "+ ((CreateSessionTxn) txn).getTimeOut());}// give dataTree a chance to sync its lastProcessedZxidrc = dt.processTxn(hdr, txn);break;case OpCode.closeSession:sessions.remove(hdr.getClientId());if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,"playLog --- close session in log: 0x"+ Long.toHexString(hdr.getClientId()));}rc = dt.processTxn(hdr, txn);break;default:// 创建节点、删除节点和其他的各种事务操作等rc = dt.processTxn(hdr, txn);}/* Snapshots are lazily created. So when a snapshot is in progress,* there is a chance for later transactions to make into the* snapshot. Then when the snapshot is restored, NONODE/NODEEXISTS* errors could occur. It should be safe to ignore these.*/if (rc.err != Code.OK.intValue()) {LOG.debug("Ignoring processTxn failure hdr: {}, error: {}, path: {}",hdr.getType(), rc.err, rc.path);}}public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn){ProcessTxnResult rc = new ProcessTxnResult();try {rc.clientId = header.getClientId();rc.cxid = header.getCxid();rc.zxid = header.getZxid();rc.type = header.getType();rc.err = 0;rc.multiResult = null;switch (header.getType()) {case OpCode.create:CreateTxn createTxn = (CreateTxn) txn;rc.path = createTxn.getPath();createNode(createTxn.getPath(),createTxn.getData(),createTxn.getAcl(),createTxn.getEphemeral() ? header.getClientId() : 0,createTxn.getParentCVersion(),header.getZxid(), header.getTime(), null);break;case OpCode.create2:CreateTxn create2Txn = (CreateTxn) txn;rc.path = create2Txn.getPath();Stat stat = new Stat();createNode(create2Txn.getPath(),create2Txn.getData(),create2Txn.getAcl(),create2Txn.getEphemeral() ? header.getClientId() : 0,create2Txn.getParentCVersion(),header.getZxid(), header.getTime(), stat);rc.stat = stat;break;case OpCode.createTTL:CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;rc.path = createTtlTxn.getPath();stat = new Stat();createNode(createTtlTxn.getPath(),createTtlTxn.getData(),createTtlTxn.getAcl(),EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()),createTtlTxn.getParentCVersion(),header.getZxid(), header.getTime(), stat);rc.stat = stat;break;case OpCode.createContainer:CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;rc.path = createContainerTxn.getPath();stat = new Stat();createNode(createContainerTxn.getPath(),createContainerTxn.getData(),createContainerTxn.getAcl(),EphemeralType.CONTAINER_EPHEMERAL_OWNER,createContainerTxn.getParentCVersion(),header.getZxid(), header.getTime(), stat);rc.stat = stat;break;....