> 文章列表 > Zookeeper源码分析——ZK选举源码解析

Zookeeper源码分析——ZK选举源码解析

Zookeeper源码分析——ZK选举源码解析

ZK选举源码解析

Zookeeper选举机制——第一次启动

Zookeeper源码分析——ZK选举源码解析

Zookeeper选举机制——非第一次启动

Zookeeper源码分析——ZK选举源码解析

ZK选举源码解析

Zookeeper源码分析——ZK选举源码解析

ZK选举准备源码解析

Zookeeper源码分析——ZK选举源码解析

public synchronized void start() {if (!getView().containsKey(myid)) {throw new RuntimeException("My id " + myid + " not in the peer list");}loadDataBase();startServerCnxnFactory();try {adminServer.start();} catch (AdminServerException e) {LOG.warn("Problem starting AdminServer", e);System.out.println(e);}startLeaderElection();super.start();}synchronized public void startLeaderElection() {try {if (getPeerState() == ServerState.LOOKING) {// 创建选票// 1)选票组件 epoch leader的任期代号)、 zxid(某个 leader当选期间执行的事务编号)、 myid serverid// 2)开始选票时,都是先投自己currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());}} catch(IOException e) {RuntimeException re = new RuntimeException(e.getMessage());re.setStackTrace(e.getStackTrace());throw re;}// if (!getView().containsKey(myid)) {//      throw new RuntimeException("My id " + myid + " not in the peer list");//}if (electionType == 0) {try {udpSocket = new DatagramSocket(getQuorumAddress().getPort());responder = new ResponderThread();responder.start();} catch (SocketException e) {throw new RuntimeException(e);}}// 创建 选举算法实例this.electionAlg = createElectionAlgorithm(electionType);}protected Election createElectionAlgorithm(int electionAlgorithm){Election le=null;//TODO: use a factory rather than a switchswitch (electionAlgorithm) {case 0:le = new LeaderElection(this);break;case 1:le = new AuthFastLeaderElection(this);break;case 2:le = new AuthFastLeaderElection(this, true);break;case 3:// 1创建 QuorumCnxnManager,负责选举过程中的所有 网络通信QuorumCnxManager qcm = createCnxnManager();QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);if (oldQcm != null) {LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");oldQcm.halt();}QuorumCnxManager.Listener listener = qcm.listener;if(listener != null){// 2启动监听线程listener.start();// 3准备开始选举FastLeaderElection fle = new FastLeaderElection(this, qcm);fle.start();le = fle;} else {LOG.error("Null listener when initializing cnx manager");}break;default:assert false;}return le;}

网络通信组件初始化

    public QuorumCnxManager createCnxnManager() {return new QuorumCnxManager(this,this.getId(),this.getView(),this.authServer,this.authLearner,this.tickTime * this.syncLimit,this.getQuorumListenOnAllIPs(),this.quorumCnxnThreadsSize,this.isQuorumSaslAuthEnabled());}public QuorumCnxManager(QuorumPeer self,final long mySid,Map<Long,QuorumPeer.QuorumServer> view,QuorumAuthServer authServer,QuorumAuthLearner authLearner,int socketTimeout,boolean listenOnAllIPs,int quorumCnxnThreadsSize,boolean quorumSaslAuthEnabled) {// 创建各种队列this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();String cnxToValue = System.getProperty("zookeeper.cnxTimeout");if(cnxToValue != null){this.cnxTO = Integer.parseInt(cnxToValue);}this.self = self;this.mySid = mySid;this.socketTimeout = socketTimeout;this.view = view;this.listenOnAllIPs = listenOnAllIPs;initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,quorumSaslAuthEnabled);// Starts listener thread that waits for connection requestslistener = new Listener();listener.setName("QuorumPeerListener");}

监听线程初始化

点击QuorumCnxManager.Listener,找到对应的 run方法

public void run() {int numRetries = 0;InetSocketAddress addr;Socket client = null;Exception exitException = null;while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {try {if (self.shouldUsePortUnification()) {LOG.info("Creating TLS-enabled quorum server socket");ss = new UnifiedServerSocket(self.getX509Util(), true);} else if (self.isSslQuorum()) {LOG.info("Creating TLS-only quorum server socket");ss = new UnifiedServerSocket(self.getX509Util(), false);} else {ss = new ServerSocket();}ss.setReuseAddress(true);if (self.getQuorumListenOnAllIPs()) {int port = self.getElectionAddress().getPort();addr = new InetSocketAddress(port);} else {// Resolve hostname for this server in case the// underlying ip address has changed.self.recreateSocketAddresses(self.getId());addr = self.getElectionAddress();}LOG.info("My election bind port: " + addr.toString());setName(addr.toString());// 绑定服务器地址ss.bind(addr);// 死循环while (!shutdown) {try {client = ss.accept();setSockOpts(client);LOG.info("Received connection request "+ formatInetAddr((InetSocketAddress)client.getRemoteSocketAddress()));// Receive and handle the connection request// asynchronously if the quorum sasl authentication is// enabled. This is required because sasl server// authentication process may take few seconds to finish,// this may delay next peer connection requests.if (quorumSaslAuthEnabled) {receiveConnectionAsync(client);} else {receiveConnection(client);}numRetries = 0;} catch (SocketTimeoutException e) {LOG.warn("The socket is listening for the election accepted "+ "and it timed out unexpectedly, but will retry."+ "see ZOOKEEPER-2836");}}... }}}

选举准备

点击FastLeaderElection

 public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){this.stop = false;this.manager = manager;starter(self, manager);}private void starter(QuorumPeer self, QuorumCnxManager manager) {this.self = self;proposedLeader = -1;proposedZxid = -1;// 初始化队列和信息sendqueue = new LinkedBlockingQueue<ToSend>();recvqueue = new LinkedBlockingQueue<Notification>();this.messenger = new Messenger(manager);}    

选举执行源码分析

Zookeeper源码分析——ZK选举源码解析

QuorumPeer.java

public synchronized void start() {if (!getView().containsKey(myid)) {throw new RuntimeException("My id " + myid + " not in the peer list");}// 冷启动数据恢复loadDataBase();startServerCnxnFactory();try {// 启动通信工厂实例对象adminServer.start();} catch (AdminServerException e) {LOG.warn("Problem starting AdminServer", e);System.out.println(e);}// 准备选举环境startLeaderElection();// 执行选举super.start();}

执行super.start();就相当于执行QuorumPeer.java 类中的run()方法
当Zookeeper 启动后,首先都是Looking 状态,通过选举,让其中一台服务器成为Leader,
其他的服务器成为Follower。

    public void run() {updateThreadName();LOG.debug("Starting quorum peer");try {jmxQuorumBean = new QuorumBean(this);MBeanRegistry.getInstance().register(jmxQuorumBean, null);for(QuorumServer s: getView().values()){ZKMBeanInfo p;if (getId() == s.id) {p = jmxLocalPeerBean = new LocalPeerBean(this);try {MBeanRegistry.getInstance().register(p, jmxQuorumBean);} catch (Exception e) {LOG.warn("Failed to register with JMX", e);jmxLocalPeerBean = null;}} else {RemotePeerBean rBean = new RemotePeerBean(this, s);try {MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);jmxRemotePeerBean.put(s.id, rBean);} catch (Exception e) {LOG.warn("Failed to register with JMX", e);}}}} catch (Exception e) {LOG.warn("Failed to register with JMX", e);jmxQuorumBean = null;}try {/** Main loop*/while (running) {switch (getPeerState()) {case LOOKING:LOG.info("LOOKING");if (Boolean.getBoolean("readonlymode.enabled")) {LOG.info("Attempting to start ReadOnlyZooKeeperServer");// Create read-only server but don't start it immediatelyfinal ReadOnlyZooKeeperServer roZk =new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);// Instead of starting roZk immediately, wait some grace// period before we decide we're partitioned.//// Thread is used here because otherwise it would require// changes in each of election strategy classes which is// unnecessary code coupling.Thread roZkMgr = new Thread() {public void run() {try {// lower-bound grace period to 2 secssleep(Math.max(2000, tickTime));if (ServerState.LOOKING.equals(getPeerState())) {roZk.startup();}} catch (InterruptedException e) {LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");} catch (Exception e) {LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);}}};try {roZkMgr.start();reconfigFlagClear();if (shuttingDownLE) {shuttingDownLE = false;startLeaderElection();}// 进行选举,选举结束,返回最终成为 Leader胜选的那张选票setCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {LOG.warn("Unexpected exception", e);setPeerState(ServerState.LOOKING);} finally {// If the thread is in the the grace period, interrupt// to come out of waiting.roZkMgr.interrupt();roZk.shutdown();}} else {try {reconfigFlagClear();if (shuttingDownLE) {shuttingDownLE = false;startLeaderElection();}setCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {LOG.warn("Unexpected exception", e);setPeerState(ServerState.LOOKING);}                        }break;case OBSERVING:try {LOG.info("OBSERVING");setObserver(makeObserver(logFactory));observer.observeLeader();} catch (Exception e) {LOG.warn("Unexpected exception",e );} finally {observer.shutdown();setObserver(null);  updateServerState();}break;case FOLLOWING:try {LOG.info("FOLLOWING");setFollower(makeFollower(logFactory));follower.followLeader();} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {follower.shutdown();setFollower(null);updateServerState();}break;case LEADING:LOG.info("LEADING");try {setLeader(makeLeader(logFactory));leader.lead();setLeader(null);} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {if (leader != null) {leader.shutdown("Forcing shutdown");setLeader(null);}updateServerState();}break;}start_fle = Time.currentElapsedTime();}} finally {...}}

点击 lookForLeader()的实现类 FastLeaderElection.java

public Vote lookForLeader() throws InterruptedException {try {self.jmxLeaderElectionBean = new LeaderElectionBean();MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);} catch (Exception e) {LOG.warn("Failed to register with JMX", e);self.jmxLeaderElectionBean = null;}if (self.start_fle == 0) {self.start_fle = Time.currentElapsedTime();}try {// 正常启动中,所有其他服务器,都会给我发送一个投票// 保存每一个服务器的最新合法有效的投票		HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();// 存储合法选举之外的投票结果HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();// 一次选举的最大等待时间,默认值是 0.2sint notTimeout = finalizeWait;// 每发起一轮选举, logicalclock++ // 在没有合法的 epoch数据之前,都使用逻辑时钟代替// 选举 leader的规则:依次比较 epoch(任期 zxid(事务 id serveridmyid 谁大谁当选 leadersynchronized(this){// 更新逻辑时钟,每进行一次选举,都需要更新逻辑时钟logicalclock.incrementAndGet();// 更新选票( serverid zxid, epoch)updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());}LOG.info("New election. My id =  " + self.getId() +", proposed zxid=0x" + Long.toHexString(proposedZxid));// 广播选票,把自己的选票发给其他服务器sendNotifications();/** Loop in which we exchange notifications until we find a leader*/// 一轮一轮的选举 直到选举成功while ((self.getPeerState() == ServerState.LOOKING) &&(!stop)){.....return null;} finally {...}}

点击 sendNotifications 广播选票,把自己的选票发给其他服务器

/*** Send notifications to all peers upon a change in our vote*/private void sendNotifications() {// 遍历投票参与者,给每台服务器发送选票for (long sid : self.getCurrentAndNextConfigVoters()) {QuorumVerifier qv = self.getQuorumVerifier();ToSend notmsg = new ToSend(ToSend.mType.notification,proposedLeader,proposedZxid,logicalclock.get(),QuorumPeer.ServerState.LOOKING,sid,proposedEpoch, qv.toString().getBytes());if(LOG.isDebugEnabled()){LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get())  +" (n.round), " + sid + " (recipient), " + self.getId() +" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");}// 把发送选票放入发送队列sendqueue.offer(notmsg);}}

在 FastLeaderElection.java类中查找 WorkerSender线程。

class WorkerSender extends ZooKeeperThread {volatile boolean stop;QuorumCnxManager manager;WorkerSender(QuorumCnxManager manager){super("WorkerSender");this.stop = false;this.manager = manager;}public void run() {while (!stop) {try {// 队列阻塞,时刻准备接收要发送的选票ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);if(m == null) continue;// 处理要发送的选票process(m);} catch (InterruptedException e) {break;}}LOG.info("WorkerSender is down");}/*** Called by run() once there is a new message to send.** @param m     message to send*/void process(ToSend m) {ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),m.leader,m.zxid,m.electionEpoch,m.peerEpoch,m.configData);// 发送选票manager.toSend(m.sid, requestBuffer);}}public void toSend(Long sid, ByteBuffer b) {/** If sending message to myself, then simply enqueue it (loopback).*/// 判断如果是发给自己的消息,直接进入自己的 RecvQueueif (this.mySid == sid) {b.position(0);addToRecvQueue(new Message(b.duplicate(), sid));/** Otherwise send to the corresponding thread to send.*/} else {/** Start a new connection if doesn't have one already.*/// 如果是发给其他服务器,创建对应的发送队列或者获取已经存在的发送队列// 并把要发送的消息放入该队列ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);if (oldq != null) {addToSendQueue(oldq, b);} else {addToSendQueue(bq, b);}// 将选票发送出去connectOne(sid);}}

如果数据是发送给自己的,添加到自己的接收队列

public void addToRecvQueue(Message msg) {synchronized(recvQLock) {if (recvQueue.remainingCapacity() == 0) {try {recvQueue.remove();} catch (NoSuchElementException ne) {// element could be removed by poll()LOG.debug("Trying to remove from an empty " +"recvQueue. Ignoring exception " + ne);}}try {// 将发送给自己的选票添加到 recvQueue队列recvQueue.add(msg);} catch (IllegalStateException ie) {// This should never happenLOG.error("Unable to insert element in the recvQueue " + ie);}}}

数据添加到发送队列

private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue,ByteBuffer buffer) {if (queue.remainingCapacity() == 0) {try {queue.remove();} catch (NoSuchElementException ne) {// element could be removed by poll()LOG.debug("Trying to remove from an empty " +"Queue. Ignoring exception " + ne);}}try {// 将要发送的消息添加到发送队列queue.add(buffer);} catch (IllegalStateException ie) {// This should never happenLOG.error("Unable to insert an element in the queue " + ie);}}

与要发送的服务器节点建立通信连接

synchronized void connectOne(long sid){if (senderWorkerMap.get(sid) != null) {LOG.debug("There is a connection already for server " + sid);return;}synchronized (self.QV_LOCK) {boolean knownId = false;// Resolve hostname for the remote server before attempting to// connect in case the underlying ip address has changed.self.recreateSocketAddresses(sid);Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();if (lastCommittedView.containsKey(sid)) {knownId = true;if (connectOne(sid, lastCommittedView.get(sid).electionAddr))return;}if (lastSeenQV != null && lastProposedView.containsKey(sid)&& (!knownId || (lastProposedView.get(sid).electionAddr !=lastCommittedView.get(sid).electionAddr))) {knownId = true;if (connectOne(sid, lastProposedView.get(sid).electionAddr))return;}if (!knownId) {LOG.warn("Invalid server id: " + sid);return;}}}synchronized private boolean connectOne(long sid, InetSocketAddress electionAddr){if (senderWorkerMap.get(sid) != null) {LOG.debug("There is a connection already for server " + sid);return true;}Socket sock = null;try {LOG.debug("Opening channel to server " + sid);if (self.isSslQuorum()) {SSLSocket sslSock = self.getX509Util().createSSLSocket();setSockOpts(sslSock);sslSock.connect(electionAddr, cnxTO);sslSock.startHandshake();sock = sslSock;LOG.info("SSL handshake complete with {} - {} - {}", sslSock.getRemoteSocketAddress(), sslSock.getSession().getProtocol(), sslSock.getSession().getCipherSuite());} else {sock = new Socket();setSockOpts(sock);sock.connect(electionAddr, cnxTO);}LOG.debug("Connected to server " + sid);// Sends connection request asynchronously if the quorum// sasl authentication is enabled. This is required because// sasl server authentication process may take few seconds to// finish, this may delay next peer connection requests.if (quorumSaslAuthEnabled) {initiateConnectionAsync(sock, sid);} else {// 处理连接initiateConnection(sock, sid);}return true;} catch (UnresolvedAddressException e) {}}public void initiateConnection(final Socket sock, final Long sid) {try {startConnection(sock, sid);} catch (IOException e) {LOG.error("Exception while connecting, id: {}, addr: {}, closing learner connection",new Object[] { sid, sock.getRemoteSocketAddress() }, e);closeSocket(sock);return;}}

创建并启动发送器线程和接收器线程

private boolean startConnection(Socket sock, Long sid)throws IOException {DataOutputStream dout = null;DataInputStream din = null;try {// Use BufferedOutputStream to reduce the number of IP packets. This is// important for x-DC scenarios.// 通过输出流,向服务器发送数据BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());dout = new DataOutputStream(buf);// Sending id and challenge// represents protocol version (in other words - message type)dout.writeLong(PROTOCOL_VERSION);dout.writeLong(self.getId());String addr = formatInetAddr(self.getElectionAddress());byte[] addr_bytes = addr.getBytes();dout.writeInt(addr_bytes.length);dout.write(addr_bytes);dout.flush();// 通过输入流读取对方发送过来的选票din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));} catch (IOException e) {LOG.warn("Ignoring exception reading or writing challenge: ", e);closeSocket(sock);return false;}// authenticate learnerQuorumPeer.QuorumServer qps = self.getVotingView().get(sid);if (qps != null) {// TODO - investigate why reconfig makes qps null.authLearner.authenticate(sock, qps.hostname);}// If lost the challenge, then drop the new connection// 如果对方的 id比我的大,我是没有资格给对方发送连接请求的,直接关闭自己的客户端if (sid > self.getId()) {LOG.info("Have smaller server identifier, so dropping the " +"connection: (" + sid + ", " + self.getId() + ")");closeSocket(sock);// Otherwise proceed with the connection} else {// 初始化,发送器 和 接收器SendWorker sw = new SendWorker(sock, sid);RecvWorker rw = new RecvWorker(sock, din, sid, sw);sw.setRecv(rw);SendWorker vsw = senderWorkerMap.get(sid);if(vsw != null)vsw.finish();senderWorkerMap.put(sid, sw);queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));// 启动发送器线程和接收器线程sw.start();rw.start();return true;}return false;}

点击 SendWorker,并查找 该类下的 run方法

QuorumCnxManager.java

public void run() {threadCnt.incrementAndGet();try {/*** If there is nothing in the queue to send, then we* send the lastMessage to ensure that the last message* was received by the peer. The message could be dropped* in case self or the peer shutdown their connection* (and exit the thread) prior to reading/processing* the last message. Duplicate messages are handled correctly* by the peer.** If the send queue is non-empty, then we have a recent* message than that stored in lastMessage. To avoid sending* stale message, we should send the message in the send queue.*/ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);if (bq == null || isSendQueueEmpty(bq)) {ByteBuffer b = lastMessageSent.get(sid);if (b != null) {LOG.debug("Attempting to send lastMessage to sid=" + sid);send(b);}}} catch (IOException e) {LOG.error("Failed to send last message. Shutting down thread.", e);this.finish();}// 只要连接没有断开try {while (running && !shutdown && sock != null) {ByteBuffer b = null;try {ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);if (bq != null) {// 不断从发送队列 SendQueue中,获取发送消息,并执行发送b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);} else {LOG.error("No queue of incoming messages for " +"server " + sid);break;}	if(b != null){// 更新对于 sid这台服务器的最近一条消息lastMessageSent.put(sid, b);// 执行发送send(b);}} catch (InterruptedException e) {LOG.warn("Interrupted while waiting for message on queue",e);}}} catch (Exception e) {LOG.warn("Exception when using channel: for id " + sid+ " my id = " + QuorumCnxManager.this.mySid+ " error = " + e);}this.finish();LOG.warn("Send worker leaving thread " + " id " + sid + " my id = " + self.getId());}}synchronized void send(ByteBuffer b) throws IOException {byte[] msgBytes = new byte[b.capacity()];try {b.position(0);b.get(msgBytes);} catch (BufferUnderflowException be) {LOG.error("BufferUnderflowException ", be);return;}// 输出流向外发送dout.writeInt(b.capacity());dout.write(b.array());dout.flush();}

点击 RecvWorker,并查找 该类下的 run方法

@Overridepublic void run() {threadCnt.incrementAndGet();try {// 只要连接没有断开while (running && !shutdown && sock != null) {/*** Reads the first int to determine the length of the* message*/int length = din.readInt();if (length <= 0 || length > PACKETMAXSIZE) {throw new IOException("Received packet with invalid packet: "+ length);}/*** Allocates a new ByteBuffer to receive the message*/byte[] msgArray = new byte[length];// 输入流接收消息din.readFully(msgArray, 0, length);ByteBuffer message = ByteBuffer.wrap(msgArray);// 接收对方发送过来的选票addToRecvQueue(new Message(message.duplicate(), sid));}} catch (Exception e) {LOG.warn("Connection broken for id " + sid + ", my id = "+ QuorumCnxManager.this.mySid + ", error = " , e);} finally {LOG.warn("Interrupting SendWorker");sw.finish();closeSocket(sock);}}}public void addToRecvQueue(Message msg) {synchronized(recvQLock) {if (recvQueue.remainingCapacity() == 0) {try {recvQueue.remove();} catch (NoSuchElementException ne) {// element could be removed by poll()LOG.debug("Trying to remove from an empty " +"recvQueue. Ignoring exception " + ne);}}try {// 将接收到的消息,放入接收消息队列recvQueue.add(msg);} catch (IllegalStateException ie) {// This should never happenLOG.error("Unable to insert element in the recvQueue " + ie);}}}

在 FastLeaderElection.java类中查找 WorkerReceiver线程。

        class WorkerReceiver extends ZooKeeperThread  {volatile boolean stop;QuorumCnxManager manager;WorkerReceiver(QuorumCnxManager manager) {super("WorkerReceiver");this.stop = false;this.manager = manager;}public void run() {Message response;while (!stop) {// Sleeps on receivetry {// 从 RecvQueue中取出选举投票消息(其他服务器发送过来的)response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);....} catch (InterruptedException e) {LOG.warn("Interrupted Exception while waiting for new message" +e.toString());}}LOG.info("WorkerReceiver is down");}}