> 文章列表 > Chapter12-主从同步机制

Chapter12-主从同步机制

Chapter12-主从同步机制

12.1 同步属性信息

        Slave 需要和 Master 同步的不只 是 消息本身,一些元数据信息也需要 同步,比如 TopicConfig 信息 、 ConsumerOffset 信息 、 DelayOffset 和SubscriptionGroupConfig 信息 。 Broker 在启动的时候,判断自己的角色是否是Slave ,是的话就启动定时同步任务

        在 syncAll 函数里,调用 syncTopicConfig ()、 syncConsumerOffset ()、
syncDelayOffset ()和 syncSubscriptionGroupConfig ()进行元数据同步 。 我们以
syncConsumerOffset 为例,来看看底层的具体实现。

        12.2 同步消息体

        本 节介绍 Master 和 Slave 之间同步消息体内容的方法,也就 是 同步CommitLog 内容的方法。 CommitLog 和元数据信息不同: 首先, CommitLog的数据量比元数据要大 ;其次 ,对实 时性和可靠性要求也不一样。 元数据信息是定时同步的 ,在两次同步的时间差里,如果出现异常可能会造成 Mastel" 上的元数据内容和 Slave 上 的元数据内容不一致 ,不过这种情况还可以补救 (手动调整 Offset ,重启 Consumer 等) 。 CommitLog 在高可靠性场景下如果没有及时同步, 一旦 Master 机器出故障, 消息就彻底丢失 了 。 所以有专 门的代码来实现Master 和Slave 之间消息体内容的同步 。

         主要 的实现代码在 Broker 模块的 org.apache.rocketmq.store.ha 包中 ,里 面包括 HAService 、 HAConnection 和 WaitNotifyObject 这三个类。

HAService 是实 现 commitLog 同步的 主体,它在 Master 机器和 Slave 机器上执行的逻辑不同, 默认是在 Master 机器上执行

        当 Broker 角色是 Slave 的时候 , MasterAddr 的值会被正确设置, 这样HAService 在启动的时候,在 HAClient 这个内部类中, connectMaster 会被正确执行 。

// org.apache.rocketmq.store.ha.DefaultHAClient#runpublic boolean connectMaster() throws ClosedChannelException {if (null == socketChannel) {String addr = this.masterHaAddress.get();if (addr != null) {SocketAddress socketAddress = NetworkUtil.string2SocketAddress(addr);this.socketChannel = RemotingHelper.connect(socketAddress);if (this.socketChannel != null) {this.socketChannel.register(this.selector, SelectionKey.OP_READ);log.info("HAClient connect to master {}", addr);this.changeCurrentState(HAConnectionState.TRANSFER);}}this.currentReportedOffset = this.defaultMessageStore.getMaxPhyOffset();this.lastReadTimestamp = System.currentTimeMillis();}return this.socketChannel != null;}

        从代码中可以看出, HAClient 试图通过 Java NIO 函数去连接 Master 角色的 Broker 。 Master 角色有相应的监听代码。

        CommitLog 的同步,不是经过 nettycommand 的方式, 而是直接进行 TCP 连接,这样效率更高。 连接成功以后,通过对比 Master 和 Slave 的Offset ,不断进行同步。

        12.3 sync_master 和 async_master

        sync_ master 和 async_master 是 写 在 Broker 配置文件里的配置参数,这个参数影响的是主从同步的方式。 从字面意思理解, sync_master 是同步方式,也就是 Master 角色 Broker 中的消息要立刻同步过去; async_ master 是异步方式 ,也就是 Master 角色 Broker 中的消息是通过异步处理的方式 同步到 Slave 角色的机器上的 。 

    private CompletableFuture<PutMessageStatus> handleHA(AppendMessageResult result, PutMessageResult putMessageResult,int needAckNums) {if (needAckNums >= 0 && needAckNums <= 1) {return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}HAService haService = this.defaultMessageStore.getHaService();long nextOffset = result.getWroteOffset() + result.getWroteBytes();// Wait enough acks from different slavesGroupCommitRequest request = new GroupCommitRequest(nextOffset, this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums);haService.putRequest(request);haService.getWaitNotifyObject().wakeupAll();return request.future();}

        在 CommitLog 类的 putMessage 函数末尾,调用 handleHA 函数 。 代码中的关键词是 wakeupAll 和 waitForFlush ,在同步方式下, Master 每次写消息的时候,都会等待向 Slave 同 步消息的过程 , 同步完成后再返回。

 

风尚购物网