延伸阅读:🔍「RocketMQ 中文社区」 持续更新源码解析/最佳实践,提供 RocketMQ 专家 AI 答疑服务

一、原理及核心概念浅述

1.1 核心架构

1.2 核心概念

  1. controller:负责管理broker间的主备关系,可以挂在namesrv中,不影响namesrv能力,支持独立部署。
  2. master/slave:主备身份。
  3. syncStateSet:字面意思为“同步状态集合”。当备节点能够及时跟上主节点,则会纳入syncStateSet。
  4. epoch:用于记录每一次主备切换时的状态,避免切换后产生数据丢失或者不一致的情况。

为方便理解,在某些过程中可以把controller当作班主任,master作为小组长,slave作为小组成员。同步过程是各位同学向小组长抄作业的过程,位于syncStateSet中的是优秀作业。

二、相关代码文件及说明

核心是“controller+broker+复制过程”,因此分三块进行叙述。

2.1 Controller

该部分代码主要集中在rocketmq-controller模块下,主要有如下代码文件:

  • ControllerManager: 负责管理controller,其中存储了许多controller相关配置,并负责了心跳管理等核心功能。(班主任管理条例)
  • DLederController: Controller的DLedger实现。包含了controller的基本功能。在其中实现了副本信息管理、broker存活情况探测、选举Master等核心功能。(某种班主任)
  • DefaultBrokerHeartbeatManager: 负责管理broker心跳,其中包含了broker存活情况表,以及在broker下线时的listeners,当副本掉线时,触发重新选举。(点名册)
  • ReplicasInfoManager: 负责controller中事件的处理。即各种选举事件、更换SyncStateSet事件等等。(小组登记册)
  • ControllerRequestProcessor: 处理向controller发送的requests,例如让controller选举、向controller注册broker、心跳、更换SyncStateSet等等。(班主任信箱)
  • DefaultElectPolicy: 选举Master的策略。可以选择从sync状态的副本中选,也可以支持从所有副本中(无论是否同步)的unclean选举。(班规)
  • ......

2.2 Broker

该部分代码主要集中在rocketmq-broker模块中,可进入org/apache/rocketmq/broker/controller进行查看:

  • ReplicasManager: 完成自己作为一个replica的使命——找controller,角色管理,Master更新(Expand/Shrink)SyncStateSet等等。

2.3 复制模块

该部分代码主要集中在rocketmq-store模块中的ha文件夹下:

  • HAService: 每个Replica必备的的service,负责管理作为主、备的同步任务。
  • HAClient: 每个Slave 的HAService中必备的client,负责管理同步任务中的读、写操作。
  • HAConnection: 代表在Master中的HA连接,每个connection理论上对应一个slave。在该connection类中存储了传输过程中的诸多内容,包括channel、传输状态、当前传输位点等等信息。

三、核心流程

3.1 心跳

核心CODE:BROKER_HEARTBEAT

Broker端:

该部分较简单,带上code向controller发request,不再赘述:

BrokerController.sendHeartbeat() -> brokerOuterAPI.sendHeartbeat()

Controller端:

1. 首先由ControllerRequestProcessor接收到code,进入处理逻辑:

private RemotingCommand handleBrokerHeartbeat(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
    final BrokerHeartbeatRequestHeader requestHeader = (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
    if (requestHeader.getBrokerId() == null) {
        return RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_INVALID_REQUEST, "Heart beat with empty brokerId");
    }
    this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerId(),
                                            requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
    return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success");
}

  1. 之后在onBrokerHeartbeat()中,主要更新controller brokerHeartbeatManager中的brokerLiveTable
public void onBrokerHeartbeat(String clusterName, String brokerName, String brokerAddr, Long brokerId,
                              Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset, Long confirmOffset, Integer electionPriority) {
    BrokerIdentityInfo brokerIdentityInfo = new BrokerIdentityInfo(clusterName, brokerName, brokerId);
    BrokerLiveInfo prev = this.brokerLiveTable.get(brokerIdentityInfo);
    ......
    if (null == prev) {
        this.brokerLiveTable.put(...);
        log.info("new broker registered, {}, brokerId:{}", brokerIdentityInfo, realBrokerId);
    } else {
        prev.setXXX(......)
    }
}

3.2 选举

相关CODE: CONTROLLER_ELECT_MASTER

有如下几种情形可能触发选举:

1.controller主动发起,通过triggerElectMaster():

   a.HeartbeatManager监听到有broker心跳失效。 (班主任发现有小组同学退学了)

   b.Controller检测到有一组Replica Set不存在master。(班主任发现有组长虽然在名册里,但是挂了)

2.broker发起将自己选为master,通过ReplicaManager.brokerElect():

   a.Broker向controller查metadata时,没找到master信息。(同学定期检查小组情况,问班主任为啥没小组长)

   b.Broker向controller注册完后,仍未从controller获取到master信息。(同学报道后发现没小组长,汇报)

3.通过tools发起:

   a.通过选举命令ReElectMasterSubCommand发起。(校长直接任命)

上述所有过程,最终均触发:

controller.electMaster() -> replicasInfoManager.electMaster()

// 即,所有小组长必须通过班主任任命

public ControllerResult<ElectMasterResponseHeader> electMaster(final ElectMasterRequestHeader request,
                                                               final ElectPolicy electPolicy) {
    ...
    // 从request中取信息
    ...
    if (syncStateInfo.isFirstTimeForElect()) {
        // 从未注册,直接任命
        newMaster = brokerId;
    }

    // 按选举政策选主
    if (newMaster == null) {
        // we should assign this assignedBrokerId when the brokerAddress need to be elected by force
        Long assignedBrokerId = request.getDesignateElect() ? brokerId : null;
        newMaster = electPolicy.elect(brokerReplicaInfo.getClusterName(), brokerReplicaInfo.getBrokerName(), syncStateSet, allReplicaBrokers, oldMaster, assignedBrokerId);
    }

    if (newMaster != null && newMaster.equals(oldMaster)) {
        // 老主 == 新主
        // old master still valid, change nothing
        String err = String.format("The old master %s is still alive, not need to elect new master for broker %s", oldMaster, brokerReplicaInfo.getBrokerName());
        LOGGER.warn("{}", err);
        // the master still exist
        response.setXXX()
        
        result.setBody(new ElectMasterResponseBody(syncStateSet).encode());
        result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, err);
        return result;
    }

    // a new master is elected
    if (newMaster != null) {
        // 出现不一样的新主
        final int masterEpoch = syncStateInfo.getMasterEpoch();
        final int syncStateSetEpoch = syncStateInfo.getSyncStateSetEpoch();
        final HashSet<Long> newSyncStateSet = new HashSet<>();
        //设置新的syncStateSet
        newSyncStateSet.add(newMaster);
        response.setXXX()...
        ElectMasterResponseBody responseBody =
        		new ElectMasterResponseBody(newSyncStateSet);
    }

        result.setBody(responseBody.encode());
        final ElectMasterEvent event = new ElectMasterEvent(brokerName, newMaster);
        result.addEvent(event);
        return result;
    }

    // 走到这里,说明没有主,选举失败
    // If elect failed and the electMaster is triggered by controller (we can figure it out by brokerAddress),
    // we still need to apply an ElectMasterEvent to tell the statemachine
    // that the master was shutdown and no new master was elected.
    if (request.getBrokerId() == null) {
        final ElectMasterEvent event = new ElectMasterEvent(false, brokerName);
        result.addEvent(event);
        result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE, "Old master has down and failed to elect a new broker master");
    } else {
        result.setCodeAndRemark(ResponseCode.CONTROLLER_ELECT_MASTER_FAILED, "Failed to elect a new master");
    }
        return result;
    }

3.3 更新SyncStateSet

核心CODE: CONTROLLER_ALTER_SYNC_STATE_SET

1.由master发起,主动向controller更换syncStateSet(等价于小组长汇报优秀作业)

2.controllerRequestProcessor接收更换syncStateSet的请求,进入handleAlterSyncStateSet()方法:

private RemotingCommand handleAlterSyncStateSet(ChannelHandlerContext ctx,
                                                RemotingCommand request) throws Exception {
    final AlterSyncStateSetRequestHeader controllerRequest = (AlterSyncStateSetRequestHeader) request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
    final SyncStateSet syncStateSet = RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
    final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().alterSyncStateSet(controllerRequest, syncStateSet);
    if (future != null) {
        return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
    }
    return RemotingCommand.createResponseCommand(null);
}

3. 之后进入Controller.alterSyncStateSet() -> replicasInfoManager.alterSyncStateSet()方法:

public ControllerResult<AlterSyncStateSetResponseHeader> alterSyncStateSet(
    final AlterSyncStateSetRequestHeader request, final SyncStateSet syncStateSet,
    final BrokerValidPredicate brokerAlivePredicate) {
    final String brokerName = request.getBrokerName();
	...
    final Set<Long> newSyncStateSet = syncStateSet.getSyncStateSet();
    final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName);
    final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName);

    // 检查syncStateSet是否有变化
    final Set<Long> oldSyncStateSet = syncStateInfo.getSyncStateSet();
    if (oldSyncStateSet.size() == newSyncStateSet.size() && oldSyncStateSet.containsAll(newSyncStateSet)) {
        String err = "The newSyncStateSet is equal with oldSyncStateSet, no needed to update syncStateSet";
        ...
    }

    // 检查是否是master发起的
    if (!syncStateInfo.getMasterBrokerId().equals(request.getMasterBrokerId())) {
        String err = String.format("Rejecting alter syncStateSet request because the current leader is:{%s}, not {%s}",
                                   syncStateInfo.getMasterBrokerId(), request.getMasterBrokerId());
        ...
    }

    // 检查master的任期epoch是否一致
    if (request.getMasterEpoch() != syncStateInfo.getMasterEpoch()) {
        String err = String.format("Rejecting alter syncStateSet request because the current master epoch is:{%d}, not {%d}",
                                   syncStateInfo.getMasterEpoch(), request.getMasterEpoch());
        ...
    }

    // 检查syncStateSet的epoch
    if (syncStateSet.getSyncStateSetEpoch() != syncStateInfo.getSyncStateSetEpoch()) {
        String err = String.format("Rejecting alter syncStateSet request because the current syncStateSet epoch is:{%d}, not {%d}",
                                   syncStateInfo.getSyncStateSetEpoch(), syncStateSet.getSyncStateSetEpoch());
        ...
    }

    // 检查新的syncStateSet的合理性
    for (Long replica : newSyncStateSet) {
        // 检查replica是否存在
        if (!brokerReplicaInfo.isBrokerExist(replica)) {
            String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't exist", replica);
            ...
        }
        // 检查broker是否存活
        if (!brokerAlivePredicate.check(brokerReplicaInfo.getClusterName(), brokerReplicaInfo.getBrokerName(), replica)) {
            String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't alive", replica);
            ...
        }
    }
    
    // 检查是否包含master
    if (!newSyncStateSet.contains(syncStateInfo.getMasterBrokerId())) {
        String err = String.format("Rejecting alter syncStateSet request because the newSyncStateSet don't contains origin leader {%s}", syncStateInfo.getMasterBrokerId());
        ...
    }

    // 更新epoch
    int epoch = syncStateInfo.getSyncStateSetEpoch() + 1;
    ...
    // 生成事件,替换syncStateSet
    final AlterSyncStateSetEvent event = new AlterSyncStateSetEvent(brokerName, newSyncStateSet);
    ...
}

4.最后通过syncStateInfo.updateSyncStateSetInfo(),更新syncStateSetInfoTable.get(brokerName)得到的syncStateInfo信息(该过程可以理解为班主任在班级分组册上找到了组长的名字,拿出组员名单,更新)。

3.4 复制

该部分较复杂,其中HAService/HAClient/HAConnection以及其中的各种Service/Reader/Writer容易产生混淆,对阅读造成阻碍。因此绘制本图帮助理解(可在粗读源码后回头理解):

下面对HA复制过程作拆解,分别讲解:

  1. 在各个replica的DefaultMessageStore中均注册了HAService,负责管理HA的复制。
  2. 在Master的 HAService中有一个AcceptSocketService, 负责自动接收各个slave的连接:
protected abstract class AcceptSocketService extends ServiceThread {
    ...

    /**
     * Starts listening to slave connections.
     *
     * @throws Exception If fails.
     */
    public void beginAccept() throws Exception {
        ...    
    }

    @Override
    public void shutdown(final boolean interrupt) {
        ...
    }

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                this.selector.select(1000);
                Set<SelectionKey> selected = this.selector.selectedKeys();

                if (selected != null) {
                    for (SelectionKey k : selected) {
                        if (k.isAcceptable()) {
                            SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

                            if (sc != null) {
                                DefaultHAService.log.info("HAService receive new connection, "
                                                          + sc.socket().getRemoteSocketAddress());
                                try {
                                    HAConnection conn = createConnection(sc);
                                    conn.start();
                                    DefaultHAService.this.addConnection(conn);
                                } catch (Exception e) {
                                    log.error("new HAConnection exception", e);
                                    sc.close();
                                }
                            }
                        } 
                        ...
    }
}

     3.在各个Slave 的HAService中存在一个HAClient,负责向master发起连接、传输请求。

public class AutoSwitchHAClient extends ServiceThread implements HAClient {
    ...
}

public interface HAClient {
    void start();
    
    void shutdown();
    
    void wakeup();

    void updateMasterAddress(String newAddress);

    void updateHaMasterAddress(String newAddress);

    String getMasterAddress();

    String getHaMasterAddress();

    long getLastReadTimestamp();

    long getLastWriteTimestamp();

    HAConnectionState getCurrentState();

    void changeCurrentState(HAConnectionState haConnectionState);

    void closeMaster();

    long getTransferredByteInSecond();
}

         4.当master收到slave的连接请求后,将会创建一个HAConnection,负责收发内容。

    public interface HAConnection {
        void start();
    
        void shutdown();
    
        void close();
    
        SocketChannel getSocketChannel();
    
        HAConnectionState getCurrentState();
    
        String getClientAddress();
    
        long getTransferredByteInSecond();
    
        long getTransferFromWhere();
    
        long getSlaveAckOffset();
    }

           5.Master的HAConnection会与Slave的HAClient建立连接,二者均通过HAWriter(较简单,不解读,位于HAWriter类)往socket中写内容,再通过HAReader读取socket中的内容。只不过一个是HAServerReader,一个是HAClientReader:

    public abstract class AbstractHAReader {
        private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
        protected final List<HAReadHook> readHookList = new ArrayList<>();
    
        public boolean read(SocketChannel socketChannel, ByteBuffer byteBufferRead) {
            int readSizeZeroTimes = 0;
            while (byteBufferRead.hasRemaining()) {
                ...
                boolean result = processReadResult(byteBufferRead);
                ...
            }
    
        }
        
    	...
        
        protected abstract boolean processReadResult(ByteBuffer byteBufferRead);
    }

           6.两种HAReader均实现了processReadResult()方法,负责处理从socket中得到的数据。client需要详细阐述该方法,因为涉及到如何将读进来的数据写入commitlog,client的processReadResult():

    @Override
    protected boolean processReadResult(ByteBuffer byteBufferRead) {
    int readSocketPos = byteBufferRead.position();
    try {
        while (true) {
            ...
                switch (AutoSwitchHAClient.this.currentState) {
                    case HANDSHAKE: {
                        ...
                        // 握手阶段,先检查commitlog完整性,截断
                    }
                        break;
                    case TRANSFER: {
                        // 传输阶段,将body写入commitlog
                        ...
                        byte[] bodyData = new byte[bodySize];
                        ...
    
                        if (bodySize > 0) {
                            // 传输阶段,将body写入commitlog
                            AutoSwitchHAClient.this.messageStore.appendToCommitLog(masterOffset, bodyData, 0, bodyData.length);
                        }
                        haService.updateConfirmOffset(Math.min(confirmOffset, messageStore.getMaxPhyOffset()));
                        ...
                        break;
                    }
                    default:
                        break;
                }
                if (isComplete) {
                    continue;
                }
    
            }
        	// 检查buffer中是否还有数据, 如果有, compact()
            ...
            break;
        }
    } 
    ...
    }

           7.server的processReadResult()主要用于接收client的握手等请求,较简单。更需要解释其WriteSocketService如何向socket中调用HAwriter去写数据:

    abstract class AbstractWriteSocketService extends ServiceThread {
        ...
    
        private void transferToSlave() throws Exception {
            ...
            int size = this.getNextTransferDataSize();
            if (size > 0) {
                ...
                buildTransferHeaderBuffer(this.transferOffset, size);
                this.lastWriteOver = this.transferData(size);
            } else {
                // 无需传输,直接更新caught up的时间
                AutoSwitchHAConnection.this.haService.updateConnectionLastCaughtUpTime(AutoSwitchHAConnection.this.slaveId, System.currentTimeMillis());
                haService.getWaitNotifyObject().allWaitForRunning(100);
            }
        }
    
        @Override
        public void run() {
            AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service started");
    
            while (!this.isStopped()) {
                try {
                    this.selector.select(1000);
    
                    switch (currentState) {
                        case HANDSHAKE:
                            // Wait until the slave send it handshake msg to master.
                            // 等待slave的握手请求,并进行回复
                            break;
                        case TRANSFER:
                            ...
                            transferToSlave();
                            break;
                        default: ...
                    }
                } catch (Exception e) {
                    ...
                }
            }
    		...
            // 在service结束后的一些事情
        }
        ...
    }

    此处同样附上server实现processReadResult(),读socket中数据的代码:

    @Override
    protected boolean processReadResult(ByteBuffer byteBufferRead) {
    while (true) {
        ...
        HAConnectionState slaveState = HAConnectionState.values()[byteBufferRead.getInt(readPosition)];   
        switch (slaveState) {
            case HANDSHAKE:
                // 收到了client的握手
                ...
                LOGGER.info("Receive slave handshake, slaveBrokerId:{}, isSyncFromLastFile:{}, isAsyncLearner:{}",
                            AutoSwitchHAConnection.this.slaveId, AutoSwitchHAConnection.this.isSyncFromLastFile, AutoSwitchHAConnection.this.isAsyncLearner);
                break;
            case TRANSFER:
                // 收到了client的transfer状态
                ...
                // 更新client状态信息
                break;
            default:
                ...
        }
    ...
    }

    3.5 Active Controller的选举

    该选举主要通过DLedger实现,在DLedgerController中通过RoleChangeHandler.handle()更新自身身份:

    class RoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler {
    
        private final String selfId;
        private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_"));
        private volatile MemberState.Role currentRole = MemberState.Role.FOLLOWER;
    
        public RoleChangeHandler(final String selfId) {
            this.selfId = selfId;
        }
    
        @Override
        public void handle(long term, MemberState.Role role) {
            Runnable runnable = () -> {
                switch (role) {
                    case CANDIDATE:
                        this.currentRole = MemberState.Role.CANDIDATE;
                        // 停止扫描inactive broker任务
                        ...
                    case FOLLOWER:
                        this.currentRole = MemberState.Role.FOLLOWER;
                        // 停止扫描inactive broker任务
                        ...
                    case LEADER: {
                        log.info("Controller {} change role to leader, try process a initial proposal", this.selfId);
                        int tryTimes = 0;
                        while (true) {
                            // 将会开始扫描inactive brokers
                            ...
                        break;
                    }
                }
            };
            this.executorService.submit(runnable);
        }
    	...
    }
    Logo

    AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

    更多推荐