一、前言 前文我们介绍了NameServer核心组件KVConfigManager,本文我们介绍NameServer另一个核心组件RouteInfoManager路由数据管理组件,该组件存放着整个消息集群的相关消息; 二、RouteInfoManager构造方法及字段publicclassRouteInfoManager{privatestaticfinalInternalLoggerlogInternalLoggerFactory。getLogger(LoggerName。NAMESRVLOGGERNAME);broker网络长连接过期时间,长连接空闲过期时间是2分钟privatefinalstaticlongBROKERCHANNELEXPIREDTIME1000602;读写锁privatefinalReadWriteLocklocknewReentrantReadWriteLock();创建topic以后,每个topic是逻辑上的概念,都是有多个queue,这些queue分散在不同的broker组里topicqueuesprivatefinalHashMapStringtopic,ListQueueDatatopicQueueT一个brokernamebrokerdata,代表的是一个broker组,一个brokerdata应该是包含了一组broker数据privatefinalHashMapStringbrokerName,BrokerDatabrokerAddrT一个nameserver是可以管理多个brokercluster,通常来说就一个cluster就可以了多业务,对于大型的公司来说,他可能是有多个业务的,每个业务是可以部署独立的broker集群,对应的都是一个nameserverprivatefinalHashMapStringclusterName,SetStringbrokerNameclusterAddrT顾名思义,他应该是用于管理跟broker之间的长连接、是否还有心跳、保活privatefinalHashMapStringbrokerAddr,BrokerLiveInfobrokerLiveTfilterserver是什么东西,rocketmq高阶的功能,我们可以基于tag来进行数据筛选,比较简单,没办法支持更加复杂细粒度的数据筛选rocketmq是支持一个高阶功能,叫做filterserver,在每台broker机器上是可以启动一个filterserverfilterserver启动之后会跟本地的broker来进行长连接构建,注册,以及心跳和保活我们可以把一个自定义的消息筛选的class,一个类,上传到filterserver里去,我们消费数据的时候,让broker把数据先传输到本地机器的filterserver里去,filterserver基于你自定义的class来进行细粒度的数据筛选把精细筛选后的数据再回传给你的消费端每个broker机器上是可以启动一个或者是多个filterserver,都会传输给nameserverprivatefinalHashMapStringbrokerAddr,ListStringFilterServerfilterServerTpublicRouteInfoManager(){this。topicQueueTablenewHashMapString,ListQueueData(1024);this。brokerAddrTablenewHashMapString,BrokerData(128);this。clusterAddrTablenewHashMapString,SetString(32);this。brokerLiveTablenewHashMapString,BrokerLiveInfo(256);this。filterServerTablenewHashMapString,ListString(256);}省略}publicclassQueueDataimplementsComparableQueueData{privateStringbrokerN每个queue都属于一个数据分区,一定是在一个broker组里privateintreadQueueN分成writequeue和readqueueprivateintwriteQueueNwritequeue是用于写入数据的路由的,readqueue是用于消费数据的路由的在这个broker里,我的topic有4个writequeue,还有4个readqueue随机的从4个writequeue里获取到一个queue来写入数据,在消费的时候,从4个readqueue里随机的挑选一个,来读取数据4个writequeue,2个readqueue会均匀的写入到4个writequeue里去,读数据的时候仅仅会读里面的2个queue的数据4个writequeue,8个readqueue你只会写入4个queue里,但是消费的时候随机从8个queue里消费的区分读写队列作用是帮助我们对topic的queues进行扩容和缩容,8个writequeue8个readqueue4个writequeue写入数据仅仅会进入这4个writequeue里去8个readqueue,读取数据,有4个queue持续消费到最新的数据,另外4个queue不会写入新数据,但是会把他也有的数据全部消费完毕,把8个readqueue4个privateinttopicSysF}publicclassBrokerDataimplementsComparableBrokerData{broker集群拓扑架构,一个broker集群多个broker组(brokername)多个broker机器(主从复制,高可用)这一组broker是属于哪个clusterprivateSbrokername代表了当前的broker组privateStringbrokerN当前这一组broker里面包含了具体的几个broker机器,privateHashMapLongbrokerId,StringbrokeraddressbrokerAprivatefinalRandomrandomnewRandom();publicBrokerData(){}}classBrokerLiveInfo{broker是可以主动给nameserver上报心跳,每次上报都可以更新这个时间戳privatelonglastUpdateTbroker数据版本号privateDataVersiondataVnettychannel,网络连接,长连接的概念privateC跟你当前这个broker机器构成HA高可用的broker地址privateStringhaServerApublicBrokerLiveInfo(longlastUpdateTimestamp,DataVersiondataVersion,Channelchannel,StringhaServerAddr){this。lastUpdateTimestamplastUpdateTthis。dataVersiondataVthis。this。haServerAddrhaServerA}}topicQueueTable:topic消息队列的路由信息,消息发送的时候会根据路由表进行负载均衡。Key为topic名称,value也是一个Map:以brokerName为key,value是队列数据如上代码所示,包含读写队列数量、权重等。brokerAddrTable:broker的基础信息,Key为brokerName,value包含brokerName,broker所在的集群信息,主备broker的地址。clusterAddrTable:broker集群信息,Key为集群名称(clusterName),value存储的是集群中所有broker的名称(brokerName)。brokerLiveTable:Broker状态信息,NameServer每次收到心跳包时会替换该信息。这也是NameServer每10秒要扫描的信息。filterServerTable:Broker上的FilterServer列表,用于类模式消息过滤。类模式过滤机制在4。4及以后版本被废弃。三、路由注册流程分析加写锁,防止并发修改路由表。首先判断Broker所属的集群(clusterName)是否存在,如果不存在则创建集群(clusterAddrTable),然后将Broker的名称添加到集群的Broker集合中。维护BrokerData信息,先从brokerAddrTable中根据Broker的名称来获取BrokerData,如果不存在,则新建一个BrokerData并保存进brokerAddrTable,registerFirst设置为true。如果该Broker已经存在对应的BrokerData,直接替换掉原来的,registerFirst为false。registerFirst为true表示第一次注册。如果接收到的Broker信息为主节点,并且Broker的Topic配置发生了变化或者是第一次注册,则需要创建或更新Topic的路由元数据(QueueData),并且把路由元数据设置更新到topicQueueTable。其实就是为默认主题自动注册路由信息,其中包含MixAll。DEFAULTTOPIC的路由信息。当消息生产者发送消息到主题时,如果该主题未创建,并且BrokerConfig的autoCreateTopicEnable为true,则返回MixAll。DEFAULTTOPIC的路由信息。更新brokerLiveTable,存储能正常使用的Broker信息。BrokerLiveInfo是执行路由删除操作的重要依据。注册Broker的过滤器Server地址列表,一个Broker会关联多个FilterServer消息过滤服务器。如果此Broker是从节点,还需要查找该Broker的主节点信息,并且更新对应的masterAdd属性。最后解锁,返回注册结果。publicRegisterBrokerResultregisterBroker(finalStringclusterName,broker所属的cluster集群finalStringbrokerAddr,broker机器地址finalStringbrokerName,broker所属的组名称finallongbrokerId,broker机器自己的idfinalStringhaServerAddr,跟你的这个broker互为HA高可用的一个机器地址finalTopicConfigSerializeWrappertopicConfigWrapper,当前的这个broker机器上面包含的topic队列数据finalListStringfilterServerList,broker机器上面部署的filterserver列表finalChannelchannel){物理上的nettychannel网络长连接RegisterBrokerResultresultnewRegisterBrokerResult();try{try{路由注册需要枷锁,防止并发修改RouteInfoManger中的路由表。this。lock。writeLock()。lockInterruptibly();拿到一个cluster集群对应的broker组,把我们的这个broker组加入到cluster里去为什么要用set数据结构,一个broker组是有多个broker机器,会注册多次,组加入cluster必须是set,这样可以对组进行去重SetStringbrokerNamesthis。clusterAddrTable。get(clusterName);if(nullbrokerNames){brokerNamesnewHashSetString();this。clusterAddrTable。put(clusterName,brokerNames);}brokerNames。add(brokerName);broker组是不是第一次来注册booleanregisterF如果是broker组第一次来注册,给初始化一份broker组数据BrokerDatabrokerDatathis。brokerAddrTable。get(brokerName);if(nullbrokerData){registerFbrokerDatanewBrokerData(clusterName,brokerName,newHashMapLong,String());this。brokerAddrTable。put(brokerName,brokerData);}拿到broker组数据里的小map,broker组里的broker机器mapMapLong,StringbrokerAddrsMapbrokerData。getBrokerAddrs();Switchslavetomaster:firstremove1,IP:PORTinnamesrv,thenadd0,IP:PORTThesameIP:PORTmustonlyhaveonerecordinbrokerAddrTable这个地方是处理一些异常数据,如果说你注册过来的broker机器地址跟之前注册过的机器地址是一样的但是brokerid是不同的,同一台机器,你启动了不同的broker节点(用的是不同的broker。conf),是不对的IteratorEntryLong,StringitbrokerAddrsMap。entrySet()。iterator();while(it。hasNext()){EntryLong,Stringitemit。next();if(null!brokerAddrbrokerAddr。equals(item。getValue())brokerId!item。getKey()){it。remove();}}把本次要注册broker地址放到了broker组对应的broker机器地址列表里去StringoldAddrbrokerData。getBrokerAddrs()。put(brokerId,brokerAddr);registerFirstregisterFirst(nulloldAddr);如果说你是一组broker里的master,而且你上报了你管理的topic数据处理broker组管理的topic的队列数据,会更新到内存的map里去if(null!topicConfigWrapperMixAll。MASTERIDbrokerId){如果broker是主节点并且topic配置信息发生该表(dataVersion不一致)或者是初次注册,需要创建或更新topic路由元数据并填充topicQueueTable,其实就是为默认主题自动注册路由信息,其中包含MixAll。DEFAULTTOPIC的路由信息。if(this。isBrokerTopicConfigChanged(brokerAddr,topicConfigWrapper。getDataVersion())registerFirst){ConcurrentMapString,TopicConfigtcTabletopicConfigWrapper。getTopicConfigTable();if(tcTable!null){for(Map。EntryString,TopicConfigentry:tcTable。entrySet()){更新或创建新的QueueDatathis。createAndUpdateQueueData(brokerName,entry。getValue());}}}}维护跟broker之间的保活数据BrokerLiveInfoprevBrokerLiveInfothis。brokerLiveTable。put(brokerAddr,newBrokerLiveInfo(System。currentTimeMillis(),topicConfigWrapper。getDataVersion(),channel,haServerAddr));if(nullprevBrokerLiveInfo){log。info(newbrokerregistered,{}HAServer:{},brokerAddr,haServerAddr);}维护broker机器上部署的filterserver的列表if(filterServerList!null){if(filterServerList。isEmpty()){this。filterServerTable。remove(brokerAddr);}else{this。filterServerTable。put(brokerAddr,filterServerList);}}如果说注册过来的机器是一组broker里的slaveif(MixAll。MASTERID!brokerId){StringmasterAddrbrokerData。getBrokerAddrs()。get(MixAll。MASTERID);if(masterAddr!null){BrokerLiveInfobrokerLiveInfothis。brokerLiveTable。get(masterAddr);if(brokerLiveInfo!null){他会把你的一组broker里的slavebroker来注册的时候给你的注册结果里设置进去你的haserveraddr,是你的这一组broker里master他的haserveraddrresult。setHaServerAddr(brokerLiveInfo。getHaServerAddr());他还会把你这一组的broker里的master地址设置进去返回给你result。setMasterAddr(masterAddr);}}}}finally{this。lock。writeLock()。unlock();}}catch(Exceptione){log。error(registerBrokerException,e);}}维护topic在各个broker里的队列数据privatevoidcreateAndUpdateQueueData(finalStringbrokerName,finalTopicConfigtopicConfig){创建队列信息QueueDataqueueDatanewQueueData();queueData。setBrokerName(brokerName);queueData。setWriteQueueNums(topicConfig。getWriteQueueNums());queueData。setReadQueueNums(topicConfig。getReadQueueNums());queueData。setPerm(topicConfig。getPerm());queueData。setTopicSysFlag(topicConfig。getTopicSysFlag());如果不存在该队列的信息则新建queueDataMap存放到topicQueueTableListQueueDataqueueDataListthis。topicQueueTable。get(topicConfig。getTopicName());if(nullqueueDataList){queueDataListnewLinkedListQueueData();queueDataList。add(queueData);this。topicQueueTable。put(topicConfig。getTopicName(),queueDataList);log。info(newtopicregistered,{}{},topicConfig。getTopicName(),queueData);}存在,直接更新替换旧的else{booleanaddNewOIteratorQueueDataitqueueDataList。iterator();while(it。hasNext()){QueueDataqdit。next();if(qd。getBrokerName()。equals(brokerName)){if(qd。equals(queueData)){addNewO}else{log。info(topicchanged,{}OLD:{}NEW:{},topicConfig。getTopicName(),qd,queueData);it。remove();}}}if(addNewOne){queueDataList。add(queueData);}}} NameServer与Broker保持着长连接,Broker的状态信息存储在brokerLiveTable中,NameServer每收到一个心跳包,将更新brokerLiveTable中关于Broker的状态信息以及路由表(topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable)。更新上述路由表(HashTable)使用了锁粒度较少的读写锁,允许多个消息发送者并发读操作,保证消息发送时的高并发。同一时刻NameServer只处理一个Broker心跳包,多个心跳包请求串行执行。四、NameServer处理心跳流程分析 主要是通过默认的请求处理组件DefaultRequestProcessor接收心跳请求,调用queryBrokerTopicConfig方法触发RouteInfoManager中的updateBrokerInfoUpdateTimestamp方法进行broker保活,更新时间戳; 注:后续我们会分析默认请求处理组件DefaultRequestProcessor的源码;如果说你要是broker可以定期向你的nameserver进行心跳的话,每次心跳都会更新一下broker保活数据的时间戳publicvoidupdateBrokerInfoUpdateTimestamp(finalStringbrokerAddr){BrokerLiveInfoprevthis。brokerLiveTable。get(brokerAddr);if(prev!null){prev。setLastUpdateTimestamp(System。currentTimeMillis());}}五、路由删除流程分析 NameServer会每隔10s扫描一次brokerLiveTable状态表,如果BrokerLive的lastUpdateTimestamp时间戳距当前时间超过120s,则认为Broker失效,移除该Broker,关闭与Broker的连接,同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。 RocketMQ有两个触发点来触发路由删除操作:NameServer定时扫描brokerLiveTable,检测上次心跳包与当前系统时间的时间戳,如果时间戳大于120s,则需要移除该Broker信息。Broker在正常关闭的情况下,会执行unregisterBroker指令移除该Broker信息。 1、NameServer定时扫描brokerLiveTable 每10s执行一次。逻辑也很简单,先遍历brokerLiveInfo路由表(HashMap),检测BrokerLiveInfo的LastUpdateTimestamp上次收到心跳包的时间,如果超过120s,则认为该Broker已不可用,然后将它移除并关闭连接,最后删除与该Broker相关的路由信息。broker定时保活扫描,如果说你的broker机器跟nameserver之间超过2分钟没有通信等于说关闭掉跟你的物理网络连接,以及清理掉内存数据结构里关于这个broker机器的数据publicvoidscanNotActiveBroker(){扫描的就是这个BrokerLiveTable,路由信息表。还有一个BrokernamesIteratorEntryString,BrokerLiveInfoitthis。brokerLiveTable。entrySet()。iterator();while(it。hasNext()){EntryString,BrokerLiveInfonextit。next();longlastnext。getValue()。getLastUpdateTimestamp();根据心跳时间判断是否存活的核心逻辑。两分钟未发送心跳注册请求if((lastBROKERCHANNELEXPIREDTIME)System。currentTimeMillis()){RemotingUtil。closeChannel(next。getValue()。getChannel());it。remove();log。warn(Thebrokerchannelexpired,{}{}ms,next。getKey(),BROKERCHANNELEXPIREDTIME);this。onChannelDestroy(next。getKey(),next。getValue()。getChannel());}}}获取读锁,如果Channel不为空,就遍历brokerLiveTable尝试获取使用了该Channel的Broker。最后解锁。获取写锁,根据brokerAddr从brokerLiveTable、filterServerTable中移除Broker相关的信息。维护brokerAddrTable。遍历brokerAddrTable,从BrokerData的brokerAddrs中,找到具体的Broker,从BrokerData中将其移除。如果移除后在BrokerData中不再包含其他Broker,则在brokerAddrTable中移除该brokerName对应的条目。维护clusterAddrTable,也是遍历。找到Broker并将其从集群中基础。如果移除后,集群不包含任何Broker,则将该集群从clusterAddrTable中移除。维护topicQueueTable,遍历所有主题的队列,如果队列中包含要删除的Broker的队列,则移除,如果Topic只包含待移除Broker的队列,则从topicQueueTable删除该Topic释放写锁,完成路由删除操作。publicvoidonChannelDestroy(StringremoteAddr,Channelchannel){StringbrokerAddrFif(channel!null){try{try{获取读锁this。lock。readLock()。lockInterruptibly();IteratorEntryString,BrokerLiveInfoitBrokerLiveTablethis。brokerLiveTable。entrySet()。iterator();遍历brokerLiveTablewhile(itBrokerLiveTable。hasNext()){EntryString,BrokerLiveInfoentryitBrokerLiveTable。next();获取使用该channel的brokerAddrif(entry。getValue()。getChannel()channel){brokerAddrFoundentry。getKey();}}}finally{this。lock。readLock()。unlock();}}catch(Exceptione){log。error(onChannelDestroyException,e);}}channel为空或者没有使用该channel的Brokerif(nullbrokerAddrFound){brokerAddrFoundremoteA}else{log。info(thebrokerschanneldestroyed,{},cleanitsdatastructureatonce,brokerAddrFound);}if(brokerAddrFound!nullbrokerAddrFound。length()0){try{try{申请写锁this。lock。writeLock()。lockInterruptibly();根据brokerAddr从brokerLiveTable、filterServerTable中移除Broker相关的信息this。brokerLiveTable。remove(brokerAddrFound);this。filterServerTable。remove(brokerAddrFound);StringbrokerNameFbooleanremoveBrokerNIteratorEntryString,BrokerDataitBrokerAddrTablethis。brokerAddrTable。entrySet()。iterator();遍历brokerAddrTablewhile(itBrokerAddrTable。hasNext()(nullbrokerNameFound)){BrokerDatabrokerDataitBrokerAddrTable。next()。getValue();IteratorEntryLong,StringitbrokerData。getBrokerAddrs()。entrySet()。iterator();while(it。hasNext()){EntryLong,Stringentryit。next();LongbrokerIdentry。getKey();StringbrokerAddrentry。getValue();移除该brokerAddr的信息if(brokerAddr。equals(brokerAddrFound)){brokerNameFoundbrokerData。getBrokerName();it。remove();log。info(removebrokerAddr〔{},{}〕frombrokerAddrTable,becausechanneldestroyed,brokerId,brokerAddr);}}if(brokerData。getBrokerAddrs()。isEmpty()){removeBrokerNitBrokerAddrTable。remove();log。info(removebrokerName〔{}〕frombrokerAddrTable,becausechanneldestroyed,brokerData。getBrokerName());}}if(brokerNameFound!nullremoveBrokerName){IteratorEntryString,SetStringitthis。clusterAddrTable。entrySet()。iterator();遍历clusterAddrTablewhile(it。hasNext()){EntryString,SetStringentryit。next();StringclusterNameentry。getKey();SetStringbrokerNamesentry。getValue();booleanremovedbrokerNames。remove(brokerNameFound);if(removed){log。info(removebrokerName〔{}〕,clusterName〔{}〕fromclusterAddrTable,becausechanneldestroyed,brokerNameFound,clusterName);成功移除Broker之后,集群不包含任何Broker,则将该集群从clusterAddrTable中移除if(brokerNames。isEmpty()){log。info(removetheclusterName〔{}〕fromclusterAddrTable,becausechanneldestroyedandnobrokerinthiscluster,clusterName);it。remove();}}}}if(removeBrokerName){遍历topicQueueTableIteratorEntryString,ListQueueDataitTopicQueueTablethis。topicQueueTable。entrySet()。iterator();while(itTopicQueueTable。hasNext()){EntryString,ListQueueDataentryitTopicQueueTable。next();Stringtopicentry。getKey();ListQueueDataqueueDataListentry。getValue();IteratorQueueDataitQueueDataqueueDataList。iterator();while(itQueueData。hasNext()){QueueDataqueueDataitQueueData。next();if(queueData。getBrokerName()。equals(brokerNameFound)){itQueueData。remove();log。info(removetopic〔{}{}〕,fromtopicQueueTable,becausechanneldestroyed,topic,queueData);}}如果队列已经为空,移除该Topicif(queueDataList。isEmpty()){itTopicQueueTable。remove();log。info(removetopic〔{}〕allqueue,fromtopicQueueTable,becausechanneldestroyed,topic);}}}}finally{释放写锁this。lock。writeLock()。unlock();}}catch(Exceptione){log。error(onChannelDestroyException,e);}}} 2、执行unregisterBroker方法下线Brokerbroker可以注册,也可以来进行下线集群组机器publicvoidunregisterBroker(finalStringclusterName,finalStringbrokerAddr,finalStringbrokerName,finallongbrokerId){try{try{this。lock。writeLock()。lockInterruptibly();BrokerLiveInfobrokerLiveInfothis。brokerLiveTable。remove(brokerAddr);log。info(unregisterBroker,removefrombrokerLiveTable{},{},brokerLiveInfo!null?OK:Failed,brokerAddr);this。filterServerTable。remove(brokerAddr);booleanremoveBrokerNBrokerDatabrokerDatathis。brokerAddrTable。get(brokerName);if(null!brokerData){StringaddrbrokerData。getBrokerAddrs()。remove(brokerId);log。info(unregisterBroker,removeaddrfrombrokerAddrTable{},{},addr!null?OK:Failed,brokerAddr);if(brokerData。getBrokerAddrs()。isEmpty()){this。brokerAddrTable。remove(brokerName);log。info(unregisterBroker,removenamefrombrokerAddrTableOK,{},brokerName);removeBrokerN}}if(removeBrokerName){SetStringnameSetthis。clusterAddrTable。get(clusterName);if(nameSet!null){booleanremovednameSet。remove(brokerName);log。info(unregisterBroker,removenamefromclusterAddrTable{},{},removed?OK:Failed,brokerName);if(nameSet。isEmpty()){this。clusterAddrTable。remove(clusterName);log。info(unregisterBroker,removeclusterfromclusterAddrTable{},clusterName);}}this。removeTopicByBrokerName(brokerName);}}finally{this。lock。writeLock()。unlock();}}catch(Exceptione){log。error(unregisterBrokerException,e);}}对一个broker把他管理的topic数据移除掉privatevoidremoveTopicByBrokerName(finalStringbrokerName){IteratorEntryString,ListQueueDataitMapthis。topicQueueTable。entrySet()。iterator();while(itMap。hasNext()){EntryString,ListQueueDataentryitMap。next();Stringtopicentry。getKey();ListQueueDataqueueDataListentry。getValue();IteratorQueueDataitqueueDataList。iterator();while(it。hasNext()){QueueDataqdit。next();if(qd。getBrokerName()。equals(brokerName)){log。info(removeTopicByBrokerName,removeonebrokerstopic{}{},topic,qd);it。remove();}}if(queueDataList。isEmpty()){log。info(removeTopicByBrokerName,removethetopicallqueue{},topic);itMap。remove();}}}