Zookeeper源码分析——Follower和Leader状态同步源码
Follower和Leader状态同步源码
当选举结束后,每个节点都需要根据自己的角色更新自己的状态。选举出的Leader更新自己状态为 Leader,其他节点更新自己状态为 Follower
Leader更新状态入口: leader.lead()
Follower更新状态入口: follower.followerLeader()
- follower必须要让 leader知道自己的状态: epoch、 zxid、 sid
必须要找出谁是leader
发起请求连接leader
发送自己的信息给leader
leader接收到信息,必须要返回对应 的信息给 follower
-
当leader得知 follower的状态了,就确定需要做何种方式的数据同步 DIFF、 TRUNC、SNAP
-
执行数据同步
-
当 leader接收到超过半数 follower的 ack之后,进入正常工作状态,集群启动完成了
最终总结同步的方式:
- DIFF咱两一样,不需要做什么
- TRUNC follower的 zxid比 leader的 zxid大,所以 Follower要回滚
- COMMIT leader的 zxid比 follower的 zxid大,发送 Proposal给 foloower提交执行
- 如果 follower并没有任何数据,直接使用 SNAP的方式来执行数据同步(直接把数据全部序列到 follower)
Follower和Leader状态同步源码解析
Leader.lead()等待接收follower 的状态同步申请
在Leader.java 种查找lead()方法
void lead() throws IOException, InterruptedException {self.end_fle = Time.currentElapsedTime();long electionTimeTaken = self.end_fle - self.start_fle;self.setElectionTimeTaken(electionTimeTaken);LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,QuorumPeer.FLE_TIME_UNIT);self.start_fle = 0;self.end_fle = 0;zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);try {self.tick.set(0);// 恢复数据到内存,启动时,其实已经加载过了zk.loadData();leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());// Start thread that waits for connection requests from// new followers.// 等待其他 follower节点向 leader节点发送同步状态cnxAcceptor = new LearnerCnxAcceptor();cnxAcceptor.start();long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());...} finally {zk.unregisterJMX(this);}}class LearnerCnxAcceptor extends ZooKeeperCriticalThread {private volatile boolean stop = false;public LearnerCnxAcceptor() {super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), zk.getZooKeeperServerListener());}@Overridepublic void run() {try {while (!stop) {Socket s = null;boolean error = false;try {// 等待接收 follower的状态同步申请s = ss.accept();// start with the initLimit, once the ack is processed// in LearnerHandler switch to the syncLimits.setSoTimeout(self.tickTime * self.initLimit);s.setTcpNoDelay(nodelay);BufferedInputStream is = new BufferedInputStream(s.getInputStream());// 一旦接收到 follower的请求,就创建 LearnerHandler对象,处理请求LearnerHandler fh = new LearnerHandler(s, is, Leader.this);// 启动线程fh.start();} ...}public void halt() {stop = true;}}
其中ss的初始化是在创建 Leader对象时,创建的 socket
private final ServerSocket ss;Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {this.self = self;this.proposalStats = new BufferStats();try {if (self.shouldUsePortUnification() || self.isSslQuorum()) {boolean allowInsecureConnection = self.shouldUsePortUnification();if (self.getQuorumListenOnAllIPs()) {ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection, self.getQuorumAddress().getPort());} else {ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection);}} else {if (self.getQuorumListenOnAllIPs()) {ss = new ServerSocket(self.getQuorumAddress().getPort());} else {ss = new ServerSocket();}}ss.setReuseAddress(true);if (!self.getQuorumListenOnAllIPs()) {ss.bind(self.getQuorumAddress());}} catch (BindException e) {if (self.getQuorumListenOnAllIPs()) {LOG.error("Couldn't bind to port " + self.getQuorumAddress().getPort(), e);} else {LOG.error("Couldn't bind to " + self.getQuorumAddress(), e);}throw e;}this.zk = zk;this.learnerSnapshotThrottler = createLearnerSnapshotThrottler(maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);}
Follower.lead()查找并连接 Leader
在 Follower.java种查找 followLeader()方法
void followLeader() throws InterruptedException {self.end_fle = Time.currentElapsedTime();long electionTimeTaken = self.end_fle - self.start_fle;self.setElectionTimeTaken(electionTimeTaken);LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,QuorumPeer.FLE_TIME_UNIT);self.start_fle = 0;self.end_fle = 0;fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);try {// 查找 leaderQuorumServer leaderServer = findLeader(); try {// 连接 leaderconnectToLeader(leaderServer.addr, leaderServer.hostname);long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);if (self.isReconfigStateChange())throw new Exception("learned about role change");//check to see if the leader zxid is lower than ours//this should never happen but is just a safety check// 向 leader注册long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);if (newEpoch < self.getAcceptedEpoch()) {LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)+ " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));throw new IOException("Error: Epoch of leader is lower");}syncWithLeader(newEpochZxid); QuorumPacket qp = new QuorumPacket();while (this.isRunning()) {readPacket(qp);processPacket(qp);}} catch (Exception e) {LOG.warn("Exception when following the leader", e);try {sock.close();} catch (IOException e1) {e1.printStackTrace();}// clear pending revalidationspendingRevalidations.clear();}} finally {zk.unregisterJMX((Learner)this);}}protected QuorumServer findLeader() {QuorumServer leaderServer = null;// Find the leader by id// 选举投票的时候记录的,最后推荐的 leader的 sidVote current = self.getCurrentVote();// 如果这个 sid在启动的所有服务器范围中for (QuorumServer s : self.getView().values()) {if (s.id == current.getId()) {// Ensure we have the leader's correct IP address before// attempting to connect.s.recreateSocketAddresses();leaderServer = s;break;}}if (leaderServer == null) {LOG.warn("Couldn't find the leader with id = "+ current.getId());}return leaderServer;}protected void connectToLeader(InetSocketAddress addr, String hostname)throws IOException, InterruptedException, X509Exception {this.sock = createSocket();int initLimitTime = self.tickTime * self.initLimit;int remainingInitLimitTime = initLimitTime;long startNanoTime = nanoTime();for (int tries = 0; tries < 5; tries++) {try {// recalculate the init limit time because retries sleep for 1000 millisecondsremainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);if (remainingInitLimitTime <= 0) {LOG.error("initLimit exceeded on retries.");throw new IOException("initLimit exceeded on retries.");}sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));if (self.isSslQuorum()) {((SSLSocket) sock).startHandshake();}sock.setTcpNoDelay(nodelay);break;} catch (IOException e) {...Thread.sleep(1000);}self.authLearner.authenticate(sock, hostname);leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));bufferedOutput = new BufferedOutputStream(sock.getOutputStream());leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);}
Leader.lead()创建 LearnerHandler
void lead() throws IOException, InterruptedException {self.end_fle = Time.currentElapsedTime();long electionTimeTaken = self.end_fle - self.start_fle;self.setElectionTimeTaken(electionTimeTaken);LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,QuorumPeer.FLE_TIME_UNIT);self.start_fle = 0;self.end_fle = 0;zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);try {self.tick.set(0);// 恢复数据到内存,启动时,其实已经加载过了zk.loadData();leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());// Start thread that waits for connection requests from// new followers.// 等待其他 follower节点向 leader节点发送同步状态cnxAcceptor = new LearnerCnxAcceptor();cnxAcceptor.start();long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());...} finally {zk.unregisterJMX(this);}}class LearnerCnxAcceptor extends ZooKeeperCriticalThread {private volatile boolean stop = false;public LearnerCnxAcceptor() {super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), zk.getZooKeeperServerListener());}@Overridepublic void run() {try {while (!stop) {Socket s = null;boolean error = false;try {// 等待接收 follower的状态同步申请s = ss.accept();// start with the initLimit, once the ack is processed// in LearnerHandler switch to the syncLimits.setSoTimeout(self.tickTime * self.initLimit);s.setTcpNoDelay(nodelay);BufferedInputStream is = new BufferedInputStream(s.getInputStream());// 一旦接收到 follower的请求,就创建 LearnerHandler对象,处理请求LearnerHandler fh = new LearnerHandler(s, is, Leader.this);// 启动线程fh.start();} ...}public void halt() {stop = true;}}
由于public class LearnerHandler extends ZooKeeperThread{},说明 LearnerHandler是一个线程。所以 fh.start()执行的是 LearnerHandler中的 run()方法。
public void run() {try {leader.addLearnerHandler(this);// 心跳处理tickOfNextAckDeadline = leader.self.tick.get()+ leader.self.initLimit + leader.self.syncLimit;ia = BinaryInputArchive.getArchive(bufferedInput);bufferedOutput = new BufferedOutputStream(sock.getOutputStream());oa = BinaryOutputArchive.getArchive(bufferedOutput);// 从网络中接收消息,并反序列化为 packetQuorumPacket qp = new QuorumPacket();ia.readRecord(qp, "packet");// 选举结束后, observer和 follower都应该给 leader发送一个标志信息:FOLLOWERINFO或者 OBSERVERINFOif(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){LOG.error("First packet " + qp.toString()+ " is not FOLLOWERINFO or OBSERVERINFO!");return;}byte learnerInfoData[] = qp.getData();if (learnerInfoData != null) {ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);if (learnerInfoData.length >= 8) {this.sid = bbsid.getLong();}if (learnerInfoData.length >= 12) {this.version = bbsid.getInt(); // protocolVersion}if (learnerInfoData.length >= 20) {long configVersion = bbsid.getLong();if (configVersion > leader.self.getQuorumVerifier().getVersion()) {throw new IOException("Follower is ahead of the leader (has a later activated configuration)");}}} else {this.sid = leader.followerCounter.getAndDecrement();}if (leader.self.getView().containsKey(this.sid)) {LOG.info("Follower sid: " + this.sid + " : info : "+ leader.self.getView().get(this.sid).toString());} else {LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion()));}if (qp.getType() == Leader.OBSERVERINFO) {learnerType = LearnerType.OBSERVER;}// 读取 Follower发送过来的 lastAcceptedEpoch // 选举过程中,所使用的 epoch,其实还是上一任 leader的 epochlong lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());long peerLastZxid;StateSummary ss = null;// 读取 follower发送过来的 zxidlong zxid = qp.getZxid();// Leader根据从 Follower获取 sid和旧的 epoch,构建新的 epochlong newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);if (this.getVersion() < 0x10000) {// we are going to have to extrapolate the epoch informationlong epoch = ZxidUtils.getEpochFromZxid(zxid);ss = new StateSummary(epoch, zxid);// fake the messageleader.waitForEpochAck(this.getSid(), ss);} else {byte ver[] = new byte[4];ByteBuffer.wrap(ver).putInt(0x10000);// Leader向 Follower发送信息(包含 :zxid和 newEpoch)QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);oa.writeRecord(newEpochPacket, "packet");bufferedOutput.flush();QuorumPacket ackEpochPacket = new QuorumPacket();ia.readRecord(ackEpochPacket, "packet");if (ackEpochPacket.getType() != Leader.ACKEPOCH) {LOG.error(ackEpochPacket.toString()+ " is not ACKEPOCH");return;}ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());leader.waitForEpochAck(this.getSid(), ss);}peerLastZxid = ss.getLastZxid();// Take any necessary action if we need to send TRUNC or DIFF// startForwarding() will be called in all casesboolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);/* if we are not truncating or sending a diff just send a snapshot */if (needSnap) {boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;LearnerSnapshot snapshot = leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);try {long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");bufferedOutput.flush();LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "+ "send zxid of db as 0x{}, {} concurrent snapshots, " + "snapshot was {} from throttle",Long.toHexString(peerLastZxid), Long.toHexString(leaderLastZxid),Long.toHexString(zxidToSend), snapshot.getConcurrentSnapshotNumber(),snapshot.isEssential() ? "exempt" : "not exempt");// Dump data to peerleader.zk.getZKDatabase().serializeSnapshot(oa);oa.writeString("BenWasHere", "signature");bufferedOutput.flush();} finally {snapshot.close();}}LOG.debug("Sending NEWLEADER message to " + sid);// the version of this quorumVerifier will be set by leader.lead() in case// the leader is just being established. waitForEpochAck makes sure that readyToStart is true if// we got here, so the version was setif (getVersion() < 0x10000) {QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,newLeaderZxid, null, null);oa.writeRecord(newLeaderQP, "packet");} else {QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,newLeaderZxid, leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null);queuedPackets.add(newLeaderQP);}bufferedOutput.flush();// Start thread that blast packets in the queue to learnerstartSendingPackets();/ Have to wait for the first ACK, wait until* the leader is ready, and only then we can* start processing messages.*/qp = new QuorumPacket();ia.readRecord(qp, "packet");if(qp.getType() != Leader.ACK){LOG.error("Next packet was supposed to be an ACK,"+ " but received packet: {}", packetToString(qp));return;}if(LOG.isDebugEnabled()){LOG.debug("Received NEWLEADER-ACK message from " + sid); }leader.waitForNewLeaderAck(getSid(), qp.getZxid());syncLimitCheck.start();// now that the ack has been processed expect the syncLimitsock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);/ Wait until leader starts up*/synchronized(leader.zk){while(!leader.zk.isRunning() && !this.isInterrupted()){leader.zk.wait(20);}}// Mutation packets will be queued during the serialize,// so we need to mark when the peer can actually start// using the data//LOG.debug("Sending UPTODATE message to " + sid); queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));while (true) {qp = new QuorumPacket();ia.readRecord(qp, "packet");long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;if (qp.getType() == Leader.PING) {traceMask = ZooTrace.SERVER_PING_TRACE_MASK;}if (LOG.isTraceEnabled()) {ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);}tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit;ByteBuffer bb;long sessionId;int cxid;int type;switch (qp.getType()) {case Leader.ACK:if (this.learnerType == LearnerType.OBSERVER) {if (LOG.isDebugEnabled()) {LOG.debug("Received ACK from Observer " + this.sid);}}syncLimitCheck.updateAck(qp.getZxid());leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());break;case Leader.PING:// Process the touchesByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());DataInputStream dis = new DataInputStream(bis);while (dis.available() > 0) {long sess = dis.readLong();int to = dis.readInt();leader.zk.touch(sess, to);}break;case Leader.REVALIDATE:bis = new ByteArrayInputStream(qp.getData());dis = new DataInputStream(bis);long id = dis.readLong();int to = dis.readInt();ByteArrayOutputStream bos = new ByteArrayOutputStream();DataOutputStream dos = new DataOutputStream(bos);dos.writeLong(id);boolean valid = leader.zk.checkIfValidGlobalSession(id, to);if (valid) {try {//set the session owner// as the follower that// owns the sessionleader.zk.setOwner(id, this);} catch (SessionExpiredException e) {LOG.error("Somehow session " + Long.toHexString(id) +" expired right after being renewed! (impossible)", e);}}if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,"Session 0x" + Long.toHexString(id)+ " is valid: "+ valid);}dos.writeBoolean(valid);qp.setData(bos.toByteArray());queuedPackets.add(qp);break;case Leader.REQUEST:bb = ByteBuffer.wrap(qp.getData());sessionId = bb.getLong();cxid = bb.getInt();type = bb.getInt();bb = bb.slice();Request si;if(type == OpCode.sync){si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());} else {si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());}si.setOwner(this);leader.zk.submitLearnerRequest(si);break;default:LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));break;}}} catch (IOException e) {...}}
Follower.lead()创建 registerWithLeader
void followLeader() throws InterruptedException {self.end_fle = Time.currentElapsedTime();long electionTimeTaken = self.end_fle - self.start_fle;self.setElectionTimeTaken(electionTimeTaken);LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,QuorumPeer.FLE_TIME_UNIT);self.start_fle = 0;self.end_fle = 0;fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);try {// 查找 leaderQuorumServer leaderServer = findLeader(); try {// 连接leaderconnectToLeader(leaderServer.addr, leaderServer.hostname);// 向leader注册long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);if (self.isReconfigStateChange())throw new Exception("learned about role change");//check to see if the leader zxid is lower than ours//this should never happen but is just a safety checklong newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);if (newEpoch < self.getAcceptedEpoch()) {LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)+ " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));throw new IOException("Error: Epoch of leader is lower");}syncWithLeader(newEpochZxid); QuorumPacket qp = new QuorumPacket();// 循环等待接收消息while (this.isRunning()) {// 读取 packet信息readPacket(qp);// 处理 packet消息processPacket(qp);}} catch (Exception e) {LOG.warn("Exception when following the leader", e);try {sock.close();} catch (IOException e1) {e1.printStackTrace();}// clear pending revalidationspendingRevalidations.clear();}} finally {zk.unregisterJMX((Learner)this);}}protected long registerWithLeader(int pktType) throws IOException{/ Send follower info, including last zxid and sid*/long lastLoggedZxid = self.getLastLoggedZxid();QuorumPacket qp = new QuorumPacket(); qp.setType(pktType);qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));/ Add sid to payload*/LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());ByteArrayOutputStream bsid = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);boa.writeRecord(li, "LearnerInfo");qp.setData(bsid.toByteArray());// 发送 FollowerInfo给 LeaderwritePacket(qp, true);// 读取 Leader返回的结果: LeaderInforeadPacket(qp); final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());// 如果接收到 LeaderInfoif (qp.getType() == Leader.LEADERINFO) {// we are connected to a 1.0 server so accept the new epoch and read the next packetleaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();byte epochBytes[] = new byte[4];final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);// 接收 leader的 epochif (newEpoch > self.getAcceptedEpoch()) {// 把自己原来的 epoch保存在 wrappedEpochBytes里wrappedEpochBytes.putInt((int)self.getCurrentEpoch());// 把 leader发送过来的 epoch保存起来self.setAcceptedEpoch(newEpoch);} else if (newEpoch == self.getAcceptedEpoch()) {// since we have already acked an epoch equal to the leaders, we cannot ack// again, but we still need to send our lastZxid to the leader so that we can// sync with it if it does assume leadership of the epoch.// the -1 indicates that this reply should not count as an ack for the new epochwrappedEpochBytes.putInt(-1);} else {throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());}// 发送 ackepoch给 leader(包含了自己的 epoch和 zxid)QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);writePacket(ackNewEpoch, true);return ZxidUtils.makeZxid(newEpoch, 0);} else {if (newEpoch > self.getAcceptedEpoch()) {self.setAcceptedEpoch(newEpoch);}if (qp.getType() != Leader.NEWLEADER) {LOG.error("First packet should have been NEWLEADER");throw new IOException("First packet should have been NEWLEADER");}return qp.getZxid();}}
Leader.lead()接收 Follwer状态, 根据同步方式发送同步消息
public void run() {try {leader.addLearnerHandler(this);// 心跳处理tickOfNextAckDeadline = leader.self.tick.get()+ leader.self.initLimit + leader.self.syncLimit;ia = BinaryInputArchive.getArchive(bufferedInput);bufferedOutput = new BufferedOutputStream(sock.getOutputStream());oa = BinaryOutputArchive.getArchive(bufferedOutput);// 从网络中接收消息,并反序列化为 packetQuorumPacket qp = new QuorumPacket();ia.readRecord(qp, "packet");// 选举结束后, observer和 follower都应该给 leader发送一个标志信息:FOLLOWERINFO或者 OBSERVERINFOif(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){LOG.error("First packet " + qp.toString()+ " is not FOLLOWERINFO or OBSERVERINFO!");return;}byte learnerInfoData[] = qp.getData();if (learnerInfoData != null) {ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);if (learnerInfoData.length >= 8) {this.sid = bbsid.getLong();}if (learnerInfoData.length >= 12) {this.version = bbsid.getInt(); // protocolVersion}if (learnerInfoData.length >= 20) {long configVersion = bbsid.getLong();if (configVersion > leader.self.getQuorumVerifier().getVersion()) {throw new IOException("Follower is ahead of the leader (has a later activated configuration)");}}} else {this.sid = leader.followerCounter.getAndDecrement();}if (leader.self.getView().containsKey(this.sid)) {LOG.info("Follower sid: " + this.sid + " : info : "+ leader.self.getView().get(this.sid).toString());} else {LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion()));}if (qp.getType() == Leader.OBSERVERINFO) {learnerType = LearnerType.OBSERVER;}// 读取 Follower发送过来的 lastAcceptedEpoch // 选举过程中,所使用的 epoch,其实还是上一任 leader的 epochlong lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());long peerLastZxid;StateSummary ss = null;// 读取 follower发送过来的 zxidlong zxid = qp.getZxid();// Leader根据从 Follower获取 sid和旧的 epoch,构建新的 epochlong newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);if (this.getVersion() < 0x10000) {// we are going to have to extrapolate the epoch informationlong epoch = ZxidUtils.getEpochFromZxid(zxid);ss = new StateSummary(epoch, zxid);// fake the messageleader.waitForEpochAck(this.getSid(), ss);} else {byte ver[] = new byte[4];ByteBuffer.wrap(ver).putInt(0x10000);// Leader向 Follower发送信息(包含 :zxid和 newEpoch)QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);oa.writeRecord(newEpochPacket, "packet");bufferedOutput.flush();// 接收到 Follower应答的 ackepochQuorumPacket ackEpochPacket = new QuorumPacket();ia.readRecord(ackEpochPacket, "packet");if (ackEpochPacket.getType() != Leader.ACKEPOCH) {LOG.error(ackEpochPacket.toString()+ " is not ACKEPOCH");return;}ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());// 保存了对方 follower或者 observer的状态: epoch和 zxidss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());leader.waitForEpochAck(this.getSid(), ss);}peerLastZxid = ss.getLastZxid();// Take any necessary action if we need to send TRUNC or DIFF// startForwarding() will be called in all cases// 方法判断 Leader和 Follower是否需 要同步boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);/* if we are not truncating or sending a diff just send a snapshot */if (needSnap) {boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;LearnerSnapshot snapshot = leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);try {long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");bufferedOutput.flush();LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "+ "send zxid of db as 0x{}, {} concurrent snapshots, " + "snapshot was {} from throttle",Long.toHexString(peerLastZxid), Long.toHexString(leaderLastZxid),Long.toHexString(zxidToSend), snapshot.getConcurrentSnapshotNumber(),snapshot.isEssential() ? "exempt" : "not exempt");// Dump data to peerleader.zk.getZKDatabase().serializeSnapshot(oa);oa.writeString("BenWasHere", "signature");bufferedOutput.flush();} finally {snapshot.close();}}LOG.debug("Sending NEWLEADER message to " + sid);// the version of this quorumVerifier will be set by leader.lead() in case// the leader is just being established. waitForEpochAck makes sure that readyToStart is true if// we got here, so the version was setif (getVersion() < 0x10000) {QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,newLeaderZxid, null, null);oa.writeRecord(newLeaderQP, "packet");} else {QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,newLeaderZxid, leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null);queuedPackets.add(newLeaderQP);}...}} catch (IOException e) {...}}public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) {/ When leader election is completed, the leader will set its* lastProcessedZxid to be (epoch < 32). There will be no txn associated* with this zxid. The learner will set its lastProcessedZxid to the same value if* it get DIFF or SNAP from the leader. If the same learner come* back to sync with leader using this zxid, we will never find this* zxid in our history. In this case, we will ignore TRUNC logic and* always send DIFF if we have old enough history*/boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;// Keep track of the latest zxid which already queuedlong currentZxid = peerLastZxid;boolean needSnap = true;boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled();ReentrantReadWriteLock lock = db.getLogLock();ReadLock rl = lock.readLock();try {rl.lock();long maxCommittedLog = db.getmaxCommittedLog();long minCommittedLog = db.getminCommittedLog();long lastProcessedZxid = db.getDataTreeLastProcessedZxid();LOG.info("Synchronizing with Follower sid: {} maxCommittedLog=0x{}"+ " minCommittedLog=0x{} lastProcessedZxid=0x{}"+ " peerLastZxid=0x{}", getSid(),Long.toHexString(maxCommittedLog),Long.toHexString(minCommittedLog),Long.toHexString(lastProcessedZxid),Long.toHexString(peerLastZxid));if (db.getCommittedLog().isEmpty()) {/ It is possible that committedLog is empty. In that case* setting these value to the latest txn in leader db* will reduce the case that we need to handle Here is how each case handle by the if block below* 1. lastProcessZxid == peerZxid -> Handle by (2)* 2. lastProcessZxid < peerZxid -> Handle by (3)* 3. lastProcessZxid > peerZxid -> Handle by (5)*/minCommittedLog = lastProcessedZxid;maxCommittedLog = lastProcessedZxid;}/ Here are the cases that we want to handle 1. Force sending snapshot (for testing purpose)* 2. Peer and leader is already sync, send empty diff* 3. Follower has txn that we haven't seen. This may be old leader* so we need to send TRUNC. However, if peer has newEpochZxid,* we cannot send TRUNC since the follower has no txnlog* 4. Follower is within committedLog range or already in-sync.* We may need to send DIFF or TRUNC depending on follower's zxid* We always send empty DIFF if follower is already in-sync* 5. Follower missed the committedLog. We will try to use on-disk* txnlog + committedLog to sync with follower. If that fail,* we will send snapshot*/if (forceSnapSync) {// Force leader to use snapshot to sync with followerLOG.warn("Forcing snapshot sync - should not see this in production");} else if (lastProcessedZxid == peerLastZxid) {// Follower is already sync with us, send empty diffLOG.info("Sending DIFF zxid=0x" + Long.toHexString(peerLastZxid) +" for peer sid: " + getSid());queueOpPacket(Leader.DIFF, peerLastZxid);needOpPacket = false;needSnap = false;} else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {// Newer than committedLog, send trunc and doneLOG.debug("Sending TRUNC to follower zxidToSend=0x" +Long.toHexString(maxCommittedLog) +" for peer sid:" + getSid());queueOpPacket(Leader.TRUNC, maxCommittedLog);currentZxid = maxCommittedLog;needOpPacket = false;needSnap = false;} else if ((maxCommittedLog >= peerLastZxid)&& (minCommittedLog <= peerLastZxid)) {// Follower is within commitLog rangeLOG.info("Using committedLog for peer sid: " + getSid());Iterator<Proposal> itr = db.getCommittedLog().iterator();currentZxid = queueCommittedProposals(itr, peerLastZxid,null, maxCommittedLog);needSnap = false;} else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {// Use txnlog and committedLog to sync// Calculate sizeLimit that we allow to retrieve txnlog from disklong sizeLimit = db.calculateTxnLogSizeLimit();// This method can return empty iterator if the requested zxid// is older than on-disk txnlogIterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(peerLastZxid, sizeLimit);if (txnLogItr.hasNext()) {LOG.info("Use txnlog and committedLog for peer sid: " + getSid());currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid,minCommittedLog, maxCommittedLog);LOG.debug("Queueing committedLog 0x" + Long.toHexString(currentZxid));Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();currentZxid = queueCommittedProposals(committedLogItr, currentZxid,null, maxCommittedLog);needSnap = false;}// closing the resourcesif (txnLogItr instanceof TxnLogProposalIterator) {TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr;txnProposalItr.close();}} else {LOG.warn("Unhandled scenario for peer sid: " + getSid());}LOG.debug("Start forwarding 0x" + Long.toHexString(currentZxid) +" for peer sid: " + getSid());leaderLastZxid = leader.startForwarding(this, currentZxid);} finally {rl.unlock();}if (needOpPacket && !needSnap) {// This should never happen, but we should fall back to sending// snapshot just in case.LOG.error("Unhandled scenario for peer sid: " + getSid() +" fall back to use snapshot");needSnap = true;}return needSnap;}
Follower.lead()应答 Leader同步结果
protected void processPacket(QuorumPacket qp) throws Exception{switch (qp.getType()) {case Leader.PING: ping(qp); break;case Leader.PROPOSAL: TxnHeader hdr = new TxnHeader();Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);if (hdr.getZxid() != lastQueued + 1) {LOG.warn("Got zxid 0x"+ Long.toHexString(hdr.getZxid())+ " expected 0x"+ Long.toHexString(lastQueued + 1));}lastQueued = hdr.getZxid();if (hdr.getType() == OpCode.reconfig){SetDataTxn setDataTxn = (SetDataTxn) txn; QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));self.setLastSeenQuorumVerifier(qv, true); }fzk.logRequest(hdr, txn);break;case Leader.COMMIT:fzk.commit(qp.getZxid());break;case Leader.COMMITANDACTIVATE:// get the new configuration from the requestRequest request = fzk.pendingTxns.element();SetDataTxn setDataTxn = (SetDataTxn) request.getTxn(); QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData())); // get new designated leader from (current) leader's messageByteBuffer buffer = ByteBuffer.wrap(qp.getData()); long suggestedLeaderId = buffer.getLong();boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);// commit (writes the new config to ZK tree (/zookeeper/config) fzk.commit(qp.getZxid());if (majorChange) {throw new Exception("changes proposed in reconfig");}break;case Leader.UPTODATE:LOG.error("Received an UPTODATE message after Follower started");break;case Leader.REVALIDATE:revalidate(qp);break;case Leader.SYNC:fzk.sync();break;default:LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));break;}}public void commit(long zxid) {if (pendingTxns.size() == 0) {LOG.warn("Committing " + Long.toHexString(zxid)+ " without seeing txn");return;}long firstElementZxid = pendingTxns.element().zxid;if (firstElementZxid != zxid) {LOG.error("Committing zxid 0x" + Long.toHexString(zxid)+ " but next pending txn 0x"+ Long.toHexString(firstElementZxid));System.exit(12);}Request request = pendingTxns.remove();commitProcessor.commit(request);}
Leader.lead()应答 Follower
由于public class LearnerHandler extends ZooKeeperThread{},说明 LearnerHandler是一个线程。所以 fh.start()执行的是 LearnerHandler中的 run()方法。
public void run() {……//LOG.debug("Sending UPTODATE message to " + sid);queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));while (true) {……}
} catch (IOException e) {......
} finally {......
}
}