今天是学习RocketMQ
的第三天,继续来聊聊NameServer
。上一篇分析了启动流程、心跳机制以及优雅停机,我们知道除了心跳机制NameServer
还会接受broker
的注册并提供路由管理功能,今天就来聊聊NameServer
是如何进行路由管理的。
说到NameServer
的路由管理功能就不得不提到org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager
这个类。
要知道,作为消息队列的“注册中心”,NameServer
不仅需要为生产者和消费者提供关于Topic
的路由信息,还得管理集群中的所有Broker
节点,包括时刻监控Broker
的存活状态、接收Broker
的注册信息,而这些路由的基础信息,都是集成在RouteInfoManager
这个类中的。
1. 元信息
从RouteInfoManager
类的命名就可以想到,这个类是路由信息的管理类,而通常我们将这些描述信息称之为“元信息”。
我们来看看RouteInfoManager
类都存了些什么内容,
// 关于topic的元信息
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// Broker基础信息,包括broker所属集群的名称、broker的名称以及主备Broker地址。
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// Broker集群信息,存储集群中所有Broker的名称
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// Broker状态信息,主要是接收broker心跳包更新时间戳,用于路由删除
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// Broker上的FilterServer列表,用于类模式消息过滤
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
RouteInfoManager
类中有5个HashMap
,分别存放了不同的元信息,比如上一篇提到的brokerLiveTable
,就是存放Broker
节点发送来的的心跳包更新时间戳,NameServer
就是根据brokerLiveTable
属性中BrokerLiveInfo
对象的lastUpdateTimestamp
时间戳属性来判断该Broker
节点是否可用的。
在现在这个阶段我们不需要完全知道这些元信息各自代表了什么,日后讲到的时候再去源码中探索就可以,其实无非是一些数据结构而已。
2. 服务注册
说完了元信息,我们再来聊聊NameServer
实现路由管理的原理,首先是服务注册。既然是注册,那必然是“我”主动在某个网站上注册,在消息队列的场景中,就是是Broker
往NameServer
上注册,对于NameServer
而言只需要接收Broker
发送的心跳包然后更新相应的路由元信息即可,来看看源码中具体是如何实现的。
2.1 Broker 发送心跳包
既然是Broker
往NameServer
发送注册信息,那么当然要找Broker
的启动类了,按照上一篇所描述的方法,找到org.apache.rocketmq.broker.BrokerStartup
类的启动方法,在启动时会注册一个定时线程池。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
BrokerConfig#registerNameServerPeriod
属性的默认值是30秒,它决定了Broker
向NameServer
注册的时间间隔,我们可以通过在启动Broker
时用-c
参数指定配置文件,在配置文件中修改registerNameServerPeriod
属性的值。
但是由于代码中Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000))
,这个属性的范围只能指定在10秒~60秒。
所以,在默认情况下,Broker
会在启动10秒后每隔30秒向NameServer
发送心跳包,具体定时任务的业务在BrokerController#registerBrokerAll -> BrokerController#doRegisterBrokerAll -> BrokerOuterAPI#registerBrokerAll
方法中。
final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
// 省略其他代码
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
// 省略其他代码
}
从上面的方法可以看出,Broker
在发送心跳包时,会遍历集群中所有的所有NameServer
通过调用BrokerOuterAPI#registerBroker
方法发送心跳包,并且这是一个oneway
消息,也就是说只需要关注是否发送成功,无需等待返回结果。
在BrokerOuterAPI#registerBroker
方法中,又会调用RemotingClient#invokeOneway
方法,最终通过Netty
来实现网络通信的。
2.2 NameServer 接收心跳包
当Broker
向NameServer
发送心跳包之后,NameServer
自然会进行接收,而在NameServer
中接收网络请求的类叫做DefaultRequestProcessor
,在这个类的processRequest
方法中会对自定义的请求code
进行switch-case
的选择。
switch (request.getCode()) {
// ...
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
// ...
}
由于当前的RocketMQ
版本为4.7.0
,所以这里选择的方法是registerBrokerWithFilterServer
。而在这个方法中完成了数据的反序列化之后,会在上一篇文章中提到的RouteInfoManager
类维护的Map
中进行Broker
的注册,最终会调用RouteInfoManager#registerBroker
方法。
在RouteInfoManager#registerBroker
方法中,其实是基于心跳包的数据对RouteInfoManager
对象的五个属性进行维护——clusterAddrTable, brokerAddrTable, topicQueueTable, brokerLiveTable, filterServerTable
。关于这些元信息所代表的意义在前文中已经描述过,这里就不再赘述,而对于RouteInfoManager#registerBroker
方法的具体内容,在《RocketMQ技术内幕》一书中有简单的解释,或者感兴趣的同学也可以自行翻阅源码,相信读懂应该是不难的。
3. 服务删除
既然有服务注册,那必然有服务的反注册,也就是删除。
在上一篇文章中已经叙述过,NameServer
会每隔10秒扫描一次RouteInfoManager#brokerLiveTable
表,根据Broker
对应的lastUpdateTimestamp
时间戳是否超过120秒未更新来判断Broker
是否需要删除。
这是第一种路由删除策略,还有一种就是:若Broker
正常停机,Broker
会发送一个标记为RequestCode.UNREGISTER_BROKER
的消息,而在NameServer
中会通过调用RouteInfoManager#unregisterBroker
方法来进行服务的反注册,删除缓存在RouteInfoManager
类中五个HashMap
中对应的数据。
4. 服务发现
所谓的服务发现就是指使用一个注册中心来记录分布式系统中的全部服务的信息,以便其他服务能够快速的找到这些已注册的服务。
在RocketMQ
中就是当Topic
信息发生变化以后(新增、修改或删除),NameSever
会接收Broker
发送的心跳包更新该Topic
信息,但是NameSever
并不主动将最新的路由信息推送给客户端,而是客户端定时拉取Topic
的最新路由信息。
5. 小结
本文讲述了NameSever
进行路由管理的基本原理,包括元信息的概念、服务注册、服务删除、服务发现。
对于NameSever
实现这些功能的详细代码,主要是维护了RouteInfoManager
类中的五个HashMap
,在这里并没有做过多描述。
6. 参考资料
- 《RocketMQ技术内幕:RocketMQ架构设计与实现原理》
版权声明:本文为Planeswalker23所创,转载请带上原文链接,感谢。
文档信息
- 本文作者:Planeswalker23
- 本文链接:https://planeswalker23.github.io/2020/06/13/rocket-mq-3/
- 版权声明:本作品系原创,作者保留所有权利,未经作者允许,禁止转载和演绎。