> 文章列表 > Zookeeper源码分析——ZK服务端初始化源码解析

Zookeeper源码分析——ZK服务端初始化源码解析

Zookeeper源码分析——ZK服务端初始化源码解析

持久化源码

快照

public interface SnapShot {/*** deserialize a data tree from the last valid snapshot and * return the last zxid that was deserialized* 反序列化方法*/long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException;/*** persist the datatree and the sessions into a persistence storage* 序列化代码*/void serialize(DataTree dt, Map<Long, Integer> sessions, File name) throws IOException;/*** find the most recent snapshot file* 查找最近的快照文件*/File findMostRecentSnapshot() throws IOException;/*** free resources from this snapshot immediately* 释放资源*/void close() throws IOException;
} 

操作日志

public interface TxnLog {/*** Setter for ServerStats to monitor fsync threshold exceed* 设置服务状态*/void setServerStats(ServerStats serverStats);/*** roll the current* log being appended to* 滚动日志*/void rollLog() throws IOException;/*** Append a request to the transaction log* returns true iff something appended, otw false * 追加*/boolean append(TxnHeader hdr, Record r) throws IOException;/*** Start reading the transaction logs* 读取数据*/TxnIterator read(long zxid) throws IOException;/*** the last zxid of the logged transactions.* 获取最后一个 zxid*/long getLastLoggedZxid() throws IOException;/*** truncate the log to get in sync with the * leader.* 删除日志*/boolean truncate(long zxid) throws IOException;/*** the dbid for this transaction log. * @return the dbid for this transaction log.* 获取 DbId*/long getDbId() throws IOException;/*** commit the transaction and make sure* they are persisted* 提交*/void commit() throws IOException;/**** @return transaction log's elapsed sync time in milliseconds* 日志同步时间 */long getTxnLogSyncElapsedTime();/** * close the transactions logs* 关闭日志*/void close() throws IOException;/*** an iterating interface for reading * transaction logs. // 读取日志的接口*/public interface TxnIterator {/*** return the transaction header.* 获取头信息*/TxnHeader getHeader();/*** return the transaction record.* 获取传输的内容*/Record getTxn();/*** go to the next transaction record.* 下一条 记录*/boolean next() throws IOException;/*** close files and release the * resources* 关闭资源*/void close() throws IOException;/*** Get an estimated storage space used to store transaction records* that will return by this iterator* 获取存储的大小*/long getStorageSize() throws IOException;}
}

处理持久化的核心类

Zookeeper源码分析——ZK服务端初始化源码解析

序列化源码

zookeeper-jute代码是关于 Zookeeper序列化相关源码

Zookeeper源码分析——ZK服务端初始化源码解析

Zookeeper源码分析——ZK服务端初始化源码解析

序列化和反序列化代码

public interface Record {public void serialize(OutputArchive archive, String tag) throws IOException;public void deserialize(InputArchive archive, String tag) throws IOException;
}

迭代代码

public interface Index {public boolean done();public void incr();
}

支持序列化的数据类型

public interface OutputArchive {public void writeByte(byte b, String tag) throws IOException;public void writeBool(boolean b, String tag) throws IOException;public void writeInt(int i, String tag) throws IOException;public void writeLong(long l, String tag) throws IOException;public void writeFloat(float f, String tag) throws IOException;public void writeDouble(double d, String tag) throws IOException;public void writeString(String s, String tag) throws IOException;public void writeBuffer(byte buf[], String tag) throws IOException;public void writeRecord(Record r, String tag) throws IOException;public void startRecord(Record r, String tag) throws IOException;public void endRecord(Record r, String tag) throws IOException;public void startVector(List<?> v, String tag) throws IOException;public void endVector(List<?> v, String tag) throws IOException;public void startMap(TreeMap<?,?> v, String tag) throws IOException;public void endMap(TreeMap<?,?> v, String tag) throws IOException;
}

支持反序列化的数据类型

public interface InputArchive {public byte readByte(String tag) throws IOException;public boolean readBool(String tag) throws IOException;public int readInt(String tag) throws IOException;public long readLong(String tag) throws IOException;public float readFloat(String tag) throws IOException;public double readDouble(String tag) throws IOException;public String readString(String tag) throws IOException;public byte[] readBuffer(String tag) throws IOException;public void readRecord(Record r, String tag) throws IOException;public void startRecord(String tag) throws IOException;public void endRecord(String tag) throws IOException;public Index startVector(String tag) throws IOException;public void endVector(String tag) throws IOException;public Index startMap(String tag) throws IOException;public void endMap(String tag) throws IOException;
}

ZK 服务端初始化源码解析

Zookeeper源码分析——ZK服务端初始化源码解析

ZK 服务端启动脚本分析

Zookeeper 服务的启动命令是zkServer.sh start

ZOOBIN="${BASH_SOURCE-$0}"
ZOOBIN="$(dirname "${ZOOBIN}")"
ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then. "$ZOOBINDIR"/../libexec/zkEnv.sh
else. "$ZOOBINDIR"/zkEnv.sh //相当于获取zkEnv.sh 中的环境变量(ZOOCFG="zoo.cfg")
fi
...

zkServer.sh start底层的实际执行内容

nohup "$JAVA" + 一堆提交参数
+ $ZOOMAIN org.apache.zookeeper.server.quorum.QuorumPeerMain
+ "$ZOOCFG" zkEnv.sh文件中 ZOOCFG="zoo.

Zookeeper源码分析——ZK服务端初始化源码解析

所以程序的入口是 QuorumPeerMain.java类

ZK服务端启动入口

Zookeeper源码分析——ZK服务端初始化源码解析

initializeAndRun

protected void initializeAndRun(String[] args)throws ConfigException, IOException, AdminServerException{// 管理 zk的配置信息QuorumPeerConfig config = new QuorumPeerConfig();if (args.length == 1) {// 解析参数 zoo.cfg和 myidconfig.parse(args[0]);}// Start and schedule the the purge task// 启动 定时 任务, 对过期的快照,执行删除 (默认该功能关闭DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());purgeMgr.start();if (args.length == 1 && config.isDistributed()) {// 启动集群runFromConfig(config);} else {LOG.warn("Either no config or no quorum defined in config, running "+ " in standalone mode");// there is only server in the quorum -- run as standaloneZooKeeperServerMain.main(args);}}

Zookeeper源码分析——ZK服务端初始化源码解析

解析参数 zoo.cfg和 myid

QuorumPeerConfig.java

Zookeeper源码分析——ZK服务端初始化源码解析

QuorumPeerConfig.java

/*** Parse config from a Properties.* @param zkProp Properties to parse from.* @throws IOException* @throws ConfigException*/public void parseProperties(Properties zkProp)throws IOException, ConfigException {int clientPort = 0;int secureClientPort = 0;String clientPortAddress = null;String secureClientPortAddress = null;// 读取 zoo.cfg文件中的属性值,并赋值给 QuorumPeerConfig的类对象VerifyingFileFactory vff = new VerifyingFileFactory.Builder(LOG).warnForRelativePath().build();for (Entry<Object, Object> entry : zkProp.entrySet()) {String key = entry.getKey().toString().trim();String value = entry.getValue().toString().trim();if (key.equals("dataDir")) {dataDir = vff.create(value);} else if (key.equals("dataLogDir")) {dataLogDir = vff.create(value);} else if (key.equals("clientPort")) {clientPort = Integer.parseInt(value);} else if (key.equals("localSessionsEnabled")) {localSessionsEnabled = Boolean.parseBoolean(value);} else if (key.equals("localSessionsUpgradingEnabled")) {localSessionsUpgradingEnabled = Boolean.parseBoolean(value);} else if (key.equals("clientPortAddress")) {clientPortAddress = value.trim();}...}

QuorumPeerConfig.java

    void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode)throws IOException, ConfigException {quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode);setupMyId();setupClientPort();setupPeerType();checkValidity();}
    private void setupMyId() throws IOException {File myIdFile = new File(dataDir, "myid");// standalone server doesn't need myid file.if (!myIdFile.isFile()) {return;}BufferedReader br = new BufferedReader(new FileReader(myIdFile));String myIdString;try {myIdString = br.readLine();} finally {br.close();}try {// 将解析 myid文件中的 id赋值给 serverIdserverId = Long.parseLong(myIdString);MDC.put("myid", myIdString);} catch (NumberFormatException e) {throw new IllegalArgumentException("serverid " + myIdString+ " is not a number");}}

过期快照删除

可以启动定时任务,对过期的快照,执行删除 。默认该功能时关闭的

QuorumPeerMain.java

protected void initializeAndRun(String[] args)throws ConfigException, IOException, AdminServerException{// 管理 zk的配置信息QuorumPeerConfig config = new QuorumPeerConfig();if (args.length == 1) {// 1解析参数, zoo.cfg和 myidconfig.parse(args[0]);}// Start and schedule the the purge task// 2启动 定时 任务, 对过期的快照,执行删除 (默认是关闭// config.getSnapRetainCount() = 3 最少保留的快照个数// config.getPurgeInterval() = 0 默认 0表示关闭DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());purgeMgr.start();if (args.length == 1 && config.isDistributed()) {// 3 启动集群runFromConfig(config);} else {LOG.warn("Either no config or no quorum defined in config, running "+ " in standalone mode");// there is only server in the quorum -- run as standalone// 本地模式ZooKeeperServerMain.main(args);}}

DatadirCleanupManager.java

跟进定时任务的start方法

定义一个静态内部类实现定时删除快照的任务

public void start() {if (PurgeTaskStatus.STARTED == purgeTaskStatus) {LOG.warn("Purge task is already running.");return;}// 默认情况 purgeInterval=0,该任务关闭,直接返回// Don't schedule the purge task with zero or negative purge interval.if (purgeInterval <= 0) {LOG.info("Purge task is not scheduled.");return;}// 创建一个定时器timer = new Timer("PurgeTask", true);// 创建一个清理快照任务TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);// 如果 purgeInterval设置的值是 1,表示 1小时检查一次,判断是否有过期快照,有则删除timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));purgeTaskStatus = PurgeTaskStatus.STARTED;}/*** Shutdown the purge task.*/public void shutdown() {if (PurgeTaskStatus.STARTED == purgeTaskStatus) {LOG.info("Shutting down purge task.");timer.cancel();purgeTaskStatus = PurgeTaskStatus.COMPLETED;} else {LOG.warn("Purge task not started. Ignoring shutdown!");}}static class PurgeTask extends TimerTask {private File logsDir;private File snapsDir;private int snapRetainCount;public PurgeTask(File dataDir, File snapDir, int count) {logsDir = dataDir;snapsDir = snapDir;snapRetainCount = count;}@Overridepublic void run() {LOG.info("Purge task started.");try {// 清理过期的数据PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);} catch (Exception e) {LOG.error("Error occurred while purging.", e);}LOG.info("Purge task completed.");}}

查看具体清理过期数据的方法

PurgeTxnLog.java

    /*** Purges the snapshot and logs keeping the last num snapshots and the* corresponding logs. If logs are rolling or a new snapshot is created* during this process, these newest N snapshots or any data logs will be* excluded from current purging cycle.*/public static void purge(File dataDir, File snapDir, int num) throws IOException {if (num < 3) {throw new IllegalArgumentException(COUNT_ERR_MSG);}FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);List<File> snaps = txnLog.findNRecentSnapshots(num);int numSnaps = snaps.size();if (numSnaps > 0) {purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));}}

初始化通信组件

protected void initializeAndRun(String[] args)throws ConfigException, IOException, AdminServerException{// 管理 zk的配置信息QuorumPeerConfig config = new QuorumPeerConfig();if (args.length == 1) {// 1解析参数, zoo.cfg和 myidconfig.parse(args[0]);}// Start and schedule the the purge task// 2启动 定时 任务, 对过期的快照,执行删除 (默认是关闭// config.getSnapRetainCount() = 3 最少保留的快照个数// config.getPurgeInterval() = 0 默认 0表示关闭DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());purgeMgr.start();if (args.length == 1 && config.isDistributed()) {// 3 启动集群runFromConfig(config);} else {LOG.warn("Either no config or no quorum defined in config, running "+ " in standalone mode");// there is only server in the quorum -- run as standalone// 本地模式ZooKeeperServerMain.main(args);}}

通信协议默认 NIO(可以支持 Netty

进入启动集群的方法

public void runFromConfig(QuorumPeerConfig config)throws IOException, AdminServerException{try {ManagedUtil.registerLog4jMBeans();} catch (JMException e) {LOG.warn("Unable to register log4j JMX control", e);}LOG.info("Starting quorum peer");try {ServerCnxnFactory cnxnFactory = null;ServerCnxnFactory secureCnxnFactory = null;// 通信组件初始化,默认是 NIO通信if (config.getClientPortAddress() != null) {cnxnFactory = ServerCnxnFactory.createFactory();cnxnFactory.configure(config.getClientPortAddress(),config.getMaxClientCnxns(),false);}if (config.getSecureClientPortAddress() != null) {secureCnxnFactory = ServerCnxnFactory.createFactory();secureCnxnFactory.configure(config.getSecureClientPortAddress(),config.getMaxClientCnxns(),true);}// 把解析的参数赋值给该 zookeeper节点quorumPeer = getQuorumPeer();quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(),config.getDataDir()));quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());//quorumPeer.setQuorumPeers(config.getAllMembers());quorumPeer.setElectionType(config.getElectionAlg());quorumPeer.setMyid(config.getServerId());quorumPeer.setTickTime(config.getTickTime());quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());quorumPeer.setInitLimit(config.getInitLimit());quorumPeer.setSyncLimit(config.getSyncLimit());quorumPeer.setConfigFileName(config.getConfigFilename());// 管理 zk数据的存储quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);if (config.getLastSeenQuorumVerifier()!=null) {quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);}quorumPeer.initConfigInZKDatabase();// 管理 zk的通信quorumPeer.setCnxnFactory(cnxnFactory);quorumPeer.setSecureCnxnFactory(secureCnxnFactory);quorumPeer.setSslQuorum(config.isSslQuorum());quorumPeer.setUsePortUnification(config.shouldUsePortUnification());quorumPeer.setLearnerType(config.getPeerType());quorumPeer.setSyncEnabled(config.getSyncEnabled());quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());if (config.sslQuorumReloadCertFiles) {quorumPeer.getX509Util().enableCertFileReloading();}// sets quorum sasl authentication configurationsquorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);if(quorumPeer.isQuorumSaslAuthEnabled()){quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);}quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);quorumPeer.initialize();// 启动 zkquorumPeer.start();quorumPeer.join();} catch (InterruptedException e) {// warn, but generally this is okLOG.warn("Quorum Peer interrupted", e);}}

通信初始化具体实现代码

ServerCnxnFactory.java

static public ServerCnxnFactory createFactory() throws IOException {String serverCnxnFactoryName =System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);if (serverCnxnFactoryName == null) {serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();}try {ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName).getDeclaredConstructor().newInstance();LOG.info("Using {} as server connection factory", serverCnxnFactoryName);return serverCnxnFactory;} catch (Exception e) {IOException ioe = new IOException("Couldn't instantiate "+ serverCnxnFactoryName);ioe.initCause(e);throw ioe;}}

Zookeeper源码分析——ZK服务端初始化源码解析

zookeeperAdmin.md 文件中

Zookeeper源码分析——ZK服务端初始化源码解析

初始化 NIO服务端 Socket(并未启动)

public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException {if (secure) {throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");}configureSaslLogin();maxClientCnxns = maxcc;sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);// We also use the sessionlessCnxnTimeout as expiring interval for// cnxnExpiryQueue. These don't need to be the same, but the expiring// interval passed into the ExpiryQueue() constructor below should be// less than or equal to the timeout.cnxnExpiryQueue =new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);expirerThread = new ConnectionExpirerThread();int numCores = Runtime.getRuntime().availableProcessors();// 32 cores sweet spot seems to be 4 selector threadsnumSelectorThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,Math.max((int) Math.sqrt((float) numCores/2), 1));if (numSelectorThreads < 1) {throw new IOException("numSelectorThreads must be at least 1");}numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);LOG.info("Configuring NIO connection handler with "+ (sessionlessCnxnTimeout/1000) + "s sessionless connection"+ " timeout, " + numSelectorThreads + " selector thread(s), "+ (numWorkerThreads > 0 ? numWorkerThreads : "no")+ " worker threads, and "+ (directBufferBytes == 0 ? "gathered writes." :("" + (directBufferBytes/1024) + " kB direct buffers.")));for(int i=0; i<numSelectorThreads; ++i) {selectorThreads.add(new SelectorThread(i));}// 初始化 NIO服务端 socket,绑定 2181端口,可以接收客户端请求this.ss = ServerSocketChannel.open();ss.socket().setReuseAddress(true);LOG.info("binding to port " + addr);// 绑定 2181端口ss.socket().bind(addr);ss.configureBlocking(false);acceptThread = new AcceptThread(ss, addr, selectorThreads);}