大家好,我是君哥。
RocketMQ 选择了自己写 NameServer 做注册中心而没有选择 Zookeeper,这是为什么呢?
首先看一下 RocketMQ 的架构,如下图:
RocketMQ 的 Broker 注册到 NameServer 集群,而生产者和消费者则需要从 NameServer 拉取消息。
Broker 启动时,会向 NameServer 发送注册消息,相关的 UML 类图如下:
我们看一下 BrokerOuterAPI 的 registerBrokerAll 方法,代码如下:
//BrokerOuterAPI.java public List<RegisterBrokerResult> registerBrokerAll( //省略参数 final boolean compressed) { final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>(); List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { //省略 requestHeader 封装 final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body); if (result != null) { registerBrokerResultList.add(result); } } catch (Exception e) { } finally { countDownLatch.countDown(); } } }); } try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } } return registerBrokerResultList; }
可以看到,当 Broker 启动时,会向所有的 NameServer 发送注册消息,NameServer 端的注册内容如下:
从上图中看出,需要在 NameServer 上保存的数据其实是很少的。
注意:
1.Broker 向 NameServer 注册时,会注册到所有的 NameServer 服务器, NameServer 并不是分布式存储,NameServer 集群是去中心化的。
2.NameServer 会有定时任务(每 10s 一次)检查 Broker 是否下线了,判断依据是 120s 内有没有收到心跳,如果没有收到,则关闭 channel,把 Broker 信息从本地缓存移除。代码见 RouteInfoManager 类 scanNotActiveBroker 方法。
3.Broker 启动时,同时会启动定时任务,每 30s 向 NameServer 发送注册消息,NameServer 收到注册消息后更新心跳时间(BrokerLiveInfo.lastUpdateTimestamp)。
下面是 Broker 对 NameServer 的两个请求码:
注册:RequestCode.REGISTER_BROKER心跳:RequestCode.QUERY_DATA_VERSION创建 Topic 时,Broker 会向 NameServer 发送注册消息。代码如下:
//BrokerController 类 public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) { TopicConfig registerTopicConfig = topicConfig; //省略 ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(); topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig); TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); topicConfigSerializeWrapper.setDataVersion(dataVersion); topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable); doRegisterBrokerAll(true, false, topicConfigSerializeWrapper); }
最终调用了上一节的 registerBrokerAll 的方法。NameServer 收到注册消息后更新本地保存的数据,所保存的数据并没有增加新数据。
对于生产者和消费者,在发送和拉取消息时,首先会从本地缓存获取 Topic 路由信息,如果获取失败,则需要从 NameServer 进行获取。下面是获取 Topic 路由信息的 UML 类图:
看一下更新 Topic 路由信息的核心代码:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { try { if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { TopicRouteData topicRouteData; if (isDefault && defaultMQProducer != null) { //根据默认 Topic 来取 topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), clientConfig.getMqClientApiTimeout()); //省略部分逻辑 } else { topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout()); } if (topicRouteData != null) { //判断路由信息是否发送变化 TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteDataIsChange(old, topicRouteData); if (!changed) { changed = this.isNeedUpdateTopicRouteInfo(topic); } else {} if (changed) { TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); for (BrokerData bd : topicRouteData.getBrokerDatas()) { this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); } // Update Pub info if (!producerTable.isEmpty()) { //更新生产者缓存 } // Update sub info if (!consumerTable.isEmpty()) { //更新消费者缓存 } log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData); this.topicRouteTable.put(topic, cloneTopicRouteData); return true; } } else { log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId); } } catch (MQClientException e) { } catch (RemotingException e) { } finally { this.lockNamesrv.unlock(); } } else {} } catch (InterruptedException e) { } return false; }
注意:客户端会有定时任务,默认每隔 30s 向 NameServer 拉取 Topic 路由信息来刷新本地缓存。
RocketMQ 设计之初也是使用 Zookeeper 做注册中心的,这参考了 Kafka 的设计。Zookeeper 是一个非常成熟的注册中心,还有支持主节点选举、强一致等特性。使用 Zookeeper 的架构如下:
从上面的分析中可以看到,RocketMQ 需要保存的数据非常少,完全不必引入 Zookeeper 这种重量级的注册中心。
NameServer 集群各节点是对等的,相互之间并不会进行通信,这样确实会有短暂不一致。Broker 启动时会跟所有的 NameServer 建立长链接,发送注册信息。注册成功后,每 30s 会向 NameServer 发送心跳,NameServer 收到心跳后更新 Broker 的 lastUpdateTimestamp。
Zookeeper 使用 ZAB 协议来保证节点之间数据的强一致性,这要求在每一个写请求都需要在节点上写事务日志,同时需要将内存数据持久化到磁盘以保证一致性和持久性。对于 RocketMQ 这种元数据非常少的简单场景,有点小题大做了。
放弃强一致而选择可用性也是 RocketMQ 放弃 Zookeeper 的选择,这也让 NameServer 的设计更加简单。
NameServer 处理 Broker 注册的时候,考虑到多个 Broker 并发注册的问题,保存路由信息时采用了 ReadWriteLock 中的写锁,代码如下:
public RegisterBrokerResult registerBroker( //省略参数 final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); try { try { this.lock.writeLock().lockInterruptibly(); Set<String> brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet<String>(); this.clusterAddrTable.put(clusterName, brokerNames); } BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>()); this.brokerAddrTable.put(brokerName, brokerData); } BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { } return result; }
如果有新的 Broker 加入时,NameServer 并不会主动向客户端推送新的 Broker 信息,而是需要客户端的定时任务(30s 一次)去主动拉取,这样客户端保存的路由信息跟 NameServer 会有短暂的不一致。
同样,Broker 掉线后,NameServer 会用定时任务(10s 一次)检测 Broker 最后更新时间是否超过 120s,如果超过就把 Broker 路由信息删除。在客户端,同样需要定时任务(30s 一次)去主动拉取,客户端保存的路由信息跟 NameServer 也会有短暂的不一致。
从上面分析看到,NameServer 集群各节点是对等的,当集群有压力时,横向扩展非常容易。而 Zookeeper 在写扩展方面非常不灵活。
在 Broker 主从集群中,RocketMQ 实现了基于 raft 协议的 DLedger 算法,可以基于 DLedger 进行日志复制。如果 Master 节点发生故障,可以基于 DLedger 自动进行主从切换。这可以完全不依赖于 Zookeeper 的实现。
如果引入 Zookeeper,运维人员必须要具备运维 Zookeeper 的能力,这又增加了运维的复杂性。