RocketMQ消费者消息队列负载均衡
先从整体流程上简单梳理一下消息队列负载的过程。
消息队列负载由Rebalance线程默认每隔20s进行一次消息队列负载,获取主题队列信息mqSet与消费组当前所有消费者cidAll,然后按照某一种负载算法进行队列分配,分配原则为同一个消费者可以分配多个消息消息队列,同一个消息消费队列同一时间只会分配给一个消费者。此时,可以计算当前消费者分配到消息队列集合,对比原先的负载队列与当前的分配队列。如果新队列集合中不包含原来的队列,则停止原先队列消息消费并移除,如果原先队列中不包含新分配队列则创建PullRequest。
何时会触发启动
- 每隔20s会自动进行一次
- 每次有新的consumer加入到消费组中时,就会执行一次。
提供的分配算法
- AllocateMessageQueueAveragely: 平均分配。
- AllocateMessageQueueAveragelyByCircle: 平均轮询分配
- AllocateMessageQueueConsistentHash: 一致性hash
- AllocateMessageQueueByConfig: 根据配置,为每一个消费者配置固定的消息队列。
- AllocateMessageQueueByMachineRoom: 根据Broker部署机房名,对每个消费者负责不同的Broker上的队列。
启动
进行负载均衡是在RebalanceService线程中启动的,一个MQClientInstance持有一个RebalanceService实现,并随着MQClientInstance的启动而启动。
1 |
|
执行流程
1 | private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>(); |
1 | public class DefaultMQPushConsumerImpl implements MQConsumerInner |
从上面可以看出,MQClientinstance遍历已注册的消费者,对消费者执行doRebalance方法。
1 | protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner = |
上面是遍历订阅信息对每个主题的队列进行重新负载。接下来将执行rebalanceByTopic方法,会根据广播模式或集群模式分别采用不同的方法进行处理。在此处,只解释集群模式下的方法。
1 | Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); |
获取该主题下的队列信息和该消费组内当前所有的消费者ID。每个DefaultMQPushConsumerImpl都持有一个单独的RebalanceImpl对象。
1 | if (mqSet != null && cidAll != null) { |
1 | public interface AllocateMessageQueueStrategy { |
对该主题下的队列信息和该消费组内当前所有的消费者ID进行排序,确保一个消费组的成员看到的顺序是一致的,防止同一个消费队列不会被多个消费者分配。
allocateResult记录的是当前消费者的所分配的消息队列
1 | Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); |
调用updateProcessQueueTableInRebalance
对比消息队列是否发生变化
1 | private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { |
1 |
|
从上面看,processQueueTable记录的是当前消费者负载的消息队列缓存表,该方法里面的mqSet记录的的是当前消费者经过负载分配后的消息队列集合。如果processQueueTable中的消息队列在mqSet中不存在,说明该消息队列已经被分配给其他消费者,所以需要暂停该消息队列消息的消费,通过** pq.setDropped(true);该语句即可。
然后通过removeUnnecessaryMessageQueue**方法判断是否该mq从缓存中移除。
之后,开始遍历本次负载分配给该消费者的消息队列结合mqSet。如果processQueueTable中没有包含该消息队列,表示这是本次新增加的消息队列。
首先从内存中移除该消息队列的消息进度,然后调用computePullFromWhere从磁盘中读取该消息队列的消费进度,创建一个PullRequest对象。
1 | public long computePullFromWhere(MessageQueue mq) { |
从上面看出,主要有三种计算消息进度的方法,有些大同小异。
CONSUME_FROM_LAST_OFFSET:从队列最新偏移量开始消费
首先从磁盘中获取该消息队列的消费进度,如果大于0,说明该消息队列已经被消费过了,下次消费从该位置继续消费。如果等于-1,说明是首次消费,则从该消息队列的最大偏移量开始消费,如果小于-1,则说明该消息进度文件中存储了错误的偏移量,返回-1。CONSUME_FROM_FIRST_OFFSET: 从头开始消费
首先从磁盘中获取该消息队列的消费进度,如果大于0,说明该消息队列已经被消费过了,下次消费从该位置继续消费。如果等于-1,说明是首次消费,则返回0,从头开始消费,如果小于-1,则说明该消息进度文件中存储了错误的偏移量,返回-1。CONSUME_FROM_TIMESTAMP: 从消费者启动的时间戳对应的消费进度开始消费
首先从磁盘中获取该消息队列的消费进度,如果大于0,说明该消息队列已经被消费过了,下次消费从该位置继续消费。如果等于-1,尝试去操作消息存储时间戳作为消费者启动的时间戳,如果能找到则返回找到的偏移量,找不到则返回0;如果小于-1,则说明该消息进度文件中存储了错误的偏移量,返回-1。
1 | this.dispatchPullRequest(pullRequestList); |
1 | public void dispatchPullRequest(List<PullRequest> pullRequestList) { |
在该方法的最后,会调用dispatchPullRequest方法,将PullRequest加入到PullMessageService中,以唤醒PullMessageService线程,进行消息拉取。
到这里,消费者负载均衡方面就结束了。