RocketMQ.3-NameServer如何进行路由管理

2020/06/13 MQ 共 5008 字,约 15 分钟

今天是学习RocketMQ的第三天,继续来聊聊NameServer。上一篇分析了启动流程、心跳机制以及优雅停机,我们知道除了心跳机制NameServer还会接受broker的注册并提供路由管理功能,今天就来聊聊NameServer是如何进行路由管理的。

说到NameServer的路由管理功能就不得不提到org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager这个类。

要知道,作为消息队列的“注册中心”,NameServer不仅需要为生产者和消费者提供关于Topic的路由信息,还得管理集群中的所有Broker节点,包括时刻监控Broker的存活状态、接收Broker的注册信息,而这些路由的基础信息,都是集成在RouteInfoManager这个类中的。

1. 元信息

RouteInfoManager类的命名就可以想到,这个类是路由信息的管理类,而通常我们将这些描述信息称之为“元信息”。

我们来看看RouteInfoManager类都存了些什么内容,

2020061201

// 关于topic的元信息
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// Broker基础信息,包括broker所属集群的名称、broker的名称以及主备Broker地址。
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// Broker集群信息,存储集群中所有Broker的名称
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// Broker状态信息,主要是接收broker心跳包更新时间戳,用于路由删除
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// Broker上的FilterServer列表,用于类模式消息过滤
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

RouteInfoManager类中有5个HashMap,分别存放了不同的元信息,比如上一篇提到的brokerLiveTable,就是存放Broker节点发送来的的心跳包更新时间戳,NameServer就是根据brokerLiveTable属性中BrokerLiveInfo对象的lastUpdateTimestamp时间戳属性来判断该Broker节点是否可用的。

在现在这个阶段我们不需要完全知道这些元信息各自代表了什么,日后讲到的时候再去源码中探索就可以,其实无非是一些数据结构而已。

2. 服务注册

说完了元信息,我们再来聊聊NameServer实现路由管理的原理,首先是服务注册。既然是注册,那必然是“我”主动在某个网站上注册,在消息队列的场景中,就是是BrokerNameServer上注册,对于NameServer而言只需要接收Broker发送的心跳包然后更新相应的路由元信息即可,来看看源码中具体是如何实现的。

2.1 Broker 发送心跳包

既然是BrokerNameServer发送注册信息,那么当然要找Broker的启动类了,按照上一篇所描述的方法,找到org.apache.rocketmq.broker.BrokerStartup类的启动方法,在启动时会注册一个定时线程池。

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
        } catch (Throwable e) {
            log.error("registerBrokerAll Exception", e);
        }
    }
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

BrokerConfig#registerNameServerPeriod属性的默认值是30秒,它决定了BrokerNameServer注册的时间间隔,我们可以通过在启动Broker时用-c参数指定配置文件,在配置文件中修改registerNameServerPeriod属性的值。

但是由于代码中Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)),这个属性的范围只能指定在10秒~60秒。

所以,在默认情况下,Broker会在启动10秒后每隔30秒向NameServer发送心跳包,具体定时任务的业务在BrokerController#registerBrokerAll -> BrokerController#doRegisterBrokerAll -> BrokerOuterAPI#registerBrokerAll方法中。

final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
    // 省略其他代码
    final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
    for (final String namesrvAddr : nameServerAddressList) {
        brokerOuterExecutor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                    if (result != null) {
                        registerBrokerResultList.add(result);
                    }
                    log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                } catch (Exception e) {
                    log.warn("registerBroker Exception, {}", namesrvAddr, e);
                } finally {
                    countDownLatch.countDown();
                }
            }
        });
    }
    // 省略其他代码
}

从上面的方法可以看出,Broker在发送心跳包时,会遍历集群中所有的所有NameServer通过调用BrokerOuterAPI#registerBroker方法发送心跳包,并且这是一个oneway消息,也就是说只需要关注是否发送成功,无需等待返回结果。

BrokerOuterAPI#registerBroker方法中,又会调用RemotingClient#invokeOneway方法,最终通过Netty来实现网络通信的。

2.2 NameServer 接收心跳包

BrokerNameServer发送心跳包之后,NameServer自然会进行接收,而在NameServer中接收网络请求的类叫做DefaultRequestProcessor,在这个类的processRequest方法中会对自定义的请求code进行switch-case的选择。

switch (request.getCode()) {
    // ...
    case RequestCode.REGISTER_BROKER:
        Version brokerVersion = MQVersion.value2Version(request.getVersion());
        if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
            return this.registerBrokerWithFilterServer(ctx, request);
        } else {
            return this.registerBroker(ctx, request);
        }
    // ...
}

由于当前的RocketMQ版本为4.7.0,所以这里选择的方法是registerBrokerWithFilterServer。而在这个方法中完成了数据的反序列化之后,会在上一篇文章中提到的RouteInfoManager类维护的Map中进行Broker的注册,最终会调用RouteInfoManager#registerBroker方法。

RouteInfoManager#registerBroker方法中,其实是基于心跳包的数据对RouteInfoManager对象的五个属性进行维护——clusterAddrTable, brokerAddrTable, topicQueueTable, brokerLiveTable, filterServerTable。关于这些元信息所代表的意义在前文中已经描述过,这里就不再赘述,而对于RouteInfoManager#registerBroker方法的具体内容,在《RocketMQ技术内幕》一书中有简单的解释,或者感兴趣的同学也可以自行翻阅源码,相信读懂应该是不难的。

3. 服务删除

既然有服务注册,那必然有服务的反注册,也就是删除。

在上一篇文章中已经叙述过,NameServer会每隔10秒扫描一次RouteInfoManager#brokerLiveTable表,根据Broker对应的lastUpdateTimestamp时间戳是否超过120秒未更新来判断Broker是否需要删除。

这是第一种路由删除策略,还有一种就是:若Broker正常停机,Broker会发送一个标记为RequestCode.UNREGISTER_BROKER的消息,而在NameServer中会通过调用RouteInfoManager#unregisterBroker方法来进行服务的反注册,删除缓存在RouteInfoManager类中五个HashMap中对应的数据。

4. 服务发现

所谓的服务发现就是指使用一个注册中心来记录分布式系统中的全部服务的信息,以便其他服务能够快速的找到这些已注册的服务。

RocketMQ中就是当Topic信息发生变化以后(新增、修改或删除),NameSever会接收Broker发送的心跳包更新该Topic信息,但是NameSever并不主动将最新的路由信息推送给客户端,而是客户端定时拉取Topic的最新路由信息。

5. 小结

本文讲述了NameSever进行路由管理的基本原理,包括元信息的概念、服务注册、服务删除、服务发现。

对于NameSever实现这些功能的详细代码,主要是维护了RouteInfoManager类中的五个HashMap,在这里并没有做过多描述。

6. 参考资料

  • 《RocketMQ技术内幕:RocketMQ架构设计与实现原理》

版权声明:本文为Planeswalker23所创,转载请带上原文链接,感谢。

文档信息

Search

    Table of Contents