为什么会有NameServer 消息中间件一般基于主题的订阅发布机制,消息生产者会发送某一主体(Topic)的消息到消息服务器(Broker),消息服务器负责该消息的持久化存储,消息消费者订阅感兴趣的主题。通常情况下,为了避免消息服务器的单点故障导致的整个系统瘫痪,通常会部署多台消息服务器共同承担消息的存储。那么消息生产者如何 知道消息要发往哪台消息服务器呢?如果某一台消息服务器宕机了,那么生产者如何在不重启服务的情况下感知。NameServer可以 解决上述问题。
Broker消息服务器在启动的时候向所有NameServer注册,消息生产者在发送消息之前先从NameServer获取Broker服务器地址列表,然后 根据负载均衡算法从列表中选择一台消息服务器进行消息发送,如果检测到Broker宕机,则从路由注册表中将其移除,但是路由变化不会马上通知消息生产者。
NameServer本身的高可用可通过部署多台NameServer服务器来实现,但彼此互不通信,也就是NameServer服务器之间在某一时刻的数据并不会完全相同 。
NameServer作用及重要变量 NameServer存储路由的基础信息,还能够管理Broker节点,包括路由注册、路由删除等功能。
1 2 3 4 private final HashMap<String, List<QueueData>> topicQueueTable; private final HashMap<String, BrokerData> brokerAddrTable; private final HashMap<String, Set<String>> clusterAddrTable; private final HashMap<String, BrokerLiveInfo> brokerLiveTable; private final HashMap<String, List<String>> filterServerTable;
从上面可以看出数据类型都是HashMap, 其中,QueueData记录的是消息队列的信息。
1 2 3 4 5 6 7 8 9 10 public class QueueData implements Comparable <QueueData > { private String brokerName; # 读队列数量 private int readQueueNums; # 写队列数量 private int writeQueueNums; # 读写权限,具体含义参考PermName private int perm; # topic同步标记,具体含义参考TopicSysFlag private int topicSynFlag;
记录集群信息,存储集群中所有Broker名称
1 2 3 4 public class BrokerData implements Comparable <BrokerData > { private String cluster; private String brokerName; private HashMap<Long, String> brokerAddrs;
Broker状态信息
1 2 3 4 5 class BrokerLiveInfo { private long lastUpdateTimestamp; private DataVersion dataVersion; private Channel channel; private String haServerAddr;
topicQueueTable: Topic消息队列路由信息 ,消息发送时根据路由表进行负载均衡。
brokerAddrTable: Broker基础信息,包含brokerName、所属集群名称、主备Broker地址。
clusterAddrTable: Broker状态信息,存储集群中所有Broker名称。
brokerLiveTable: Broker状态信息。NameServer每次收到心跳包时会替换该信息。
filterServerTable: Broker上的FilterServer列表,用于类模式消息过滤。
路由注册 Broker启动时向集群中所有的NameServer发送心跳语句,每隔30s向集群中所有NameServer发送心跳包,NameServer收到Broker心跳包时会更新brokerLiveTable缓存中BrokerLiveInfo的lastUpdateTimestamp,然后NameServer每隔10s扫描brokerLiveTable,如果连续120s没有收到心跳包,NameServer将移除该Broker的路由信息同时关闭Socket连接。
NameServer处理心跳包 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 public RegisterBrokerResult registerBroker ( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); try { try { this .lock.writeLock().lockInterruptibly(); Set<String> brokerNames = this .clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet<String>(); this .clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); boolean registerFirst = false ; BrokerData brokerData = this .brokerAddrTable.get(brokerName); if (null == brokerData) { registerFirst = true ; brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>()); this .brokerAddrTable.put(brokerName, brokerData); } Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); while (it.hasNext()) { Entry<Long, String> item = it.next(); if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) { it.remove(); } } String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { if (this .isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null ) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this .createAndUpdateQueueData(brokerName, entry.getValue()); } } } } BrokerLiveInfo prevBrokerLiveInfo = this .brokerLiveTable.put(brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); if (null == prevBrokerLiveInfo) { log.info("new broker registered, {} HAServer: {}" , brokerAddr, haServerAddr); } if (filterServerList != null ) { if (filterServerList.isEmpty()) { this .filterServerTable.remove(brokerAddr); } else { this .filterServerTable.put(brokerAddr, filterServerList); } } if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null ) { BrokerLiveInfo brokerLiveInfo = this .brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null ) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally { this .lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception" , e); } return result; }
路由注册需要加写锁,防止并发修改RouteInfoManager中的路由表。首先判断Broker所属集群是否存在,如果不存在,则创建,然后将broker名加入到集群Broker集合中。
维护BrokerData信息,首先从brokerAddrTable根据BrokerName尝试获取Broker信息,如果不存在,则新建BrokerData并放入到brokerAddrTable,registerFirst设置为true;如果存在,直接替换原来的,registerFirst设置为false,表示非第一次注册。
如果Broker为Master,并且Broker Topic配置信息发生变化或者是初次注册,则需要创建或更新Topic路由元数据,填充topicQueueTable,其实就是为默认主题自动注册路由信息。根据TopicConfig创建QueueData的数据结构,然后更新topicQueueTable。
更新BrokerLiveInfo,存活Broker信息表,BrokerLiveInfo是执行路由删除的重要依据。
注册Broker的过滤器Server地址列表,一个Broker上会关联多个FilterServer消息过滤服务器。
路由删除 NameServer会每隔10s扫描brokerLiveTable状态表,如果BrokerLive的lastUpdateTimestamp的时间戳距当前时间超过120s,则认为Broker失效,移除该Broker,关闭与Broker连接,并同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。