拉取完消息后,将消息存放在ProcessQueue消息处理队列中,然后进行消息消费。
RocketMQ使用ConsumeMessageServie来实现消息消费,支持顺序消费和并发消费。在这里主要讲一下并发消费的流程。
1 2 3 4 5 6 7 8 9 10 public class ConsumeMessageConcurrentlyService implements ConsumeMessageService { private static final Logger log = ClientLogger.getLog(); private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; private final DefaultMQPushConsumer defaultMQPushConsumer; private final MessageListenerConcurrently messageListener; private final BlockingQueue<Runnable> consumeRequestQueue; private final ThreadPoolExecutor consumeExecutor; private final String consumerGroup; private final ScheduledExecutorService scheduledExecutorService; private final ScheduledExecutorService cleanExpireMsgExecutors;
启动位置 从服务器拉取到消息后回调PullCallBack回调方法后,先将消息放入到ProcessQueue中,然后把消息提交到消息线程池中执行,则会调用ConsumeMessageConcurrentlyService.submitConsumeRequest方法开始消息消费
msgs: 消息列表 processQueue: 消息处理队列 messageQueue: 消息所属消费 dispatchToconsume: 是否转发到消费线程池,并发消费时忽略该参数
1 2 3 4 5 public void submitConsumeRequest (// final List<MessageExt> msgs, // final ProcessQueue processQueue, // final MessageQueue messageQueue, // final boolean dispatchToConsume)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 final int consumeBatchSize = this .defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { this .consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this .submitConsumeRequestLater(consumeRequest); } } else { for (int total = 0 ; total < msgs.size(); ) { List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize); for (int i = 0 ; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { msgThis.add(msgs.get(total)); } else { break ; } } ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); try { this .consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); } this .submitConsumeRequestLater(consumeRequest); } } }
首先获取系统设置的consumeMessageBatchMaxSize,即一次消息消费任务ConsumeRequest中包含的消息条数,默认为1,msgs默认最多为32,当msgs数量超过consumeMessageMaxSize时,采取分页处理。然后将consumeRequest提交到线程池。consumeRequest的run方法开始执行。
1 2 3 4 class ConsumeRequest implements Runnable { private final List<MessageExt> msgs; private final ProcessQueue processQueue; private final MessageQueue messageQueue;
下面对run方法进行拆分,分批次进行介绍。
1 2 3 4 if (this .processQueue.isDropped()) { log.info("the message queue not be able to consume, because it's dropped. group={} {}" , ConsumeMessageConcurrentlyService.this .consumerGroup, this .messageQueue); return ; }
首先判断该processQueue的drop是否为true,如果drop为true,则说明该消息队列已经被分配给消费组里的其他消费者了,则要停止对该消息队列的消费。
如果有消费钩子函数的话,要执行钩子函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 long beginTimestamp = System.currentTimeMillis(); boolean hasException = false ; ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; try { ConsumeMessageConcurrentlyService.this .resetRetryTopic(msgs); if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); } } status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
1 2 3 4 5 6 7 8 9 public void resetRetryTopic (final List<MessageExt> msgs) { final String groupTopic = MixAll.getRetryTopic(consumerGroup); for (MessageExt msg : msgs) { String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); if (retryTopic != null && groupTopic.equals(msg.getTopic())) { msg.setTopic(retryTopic); } } }
通过调用resetRetryTopic 来恢复重试消息主题名。 RocketMQ将消息存入到commitlog文件时,如果发现消息的延迟级别delayTimeLevel大于0,会首先将重试主题存入在消息的属性中,然后设置主题名称为SCHEDULE_TOPIC,以便时间到后重新参与消息消费。
然后执行具体的消息消费,调用应用程序消息监听器的consumeMessage方法,返回消费结果。
1 2 3 4 5 6 7 if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}" , ConsumeMessageConcurrentlyService.this .consumerGroup, msgs, messageQueue); status = ConsumeConcurrentlyStatus.RECONSUME_LATER; }
返回的结果可能是CONSUME_SUCCESS(消费成功)或RECONSUME_LATER(需要重新消费)。
1 2 3 4 5 6 if (!processQueue.isDropped()) { ConsumeMessageConcurrentlyService.this .processConsumeResult(status, context, this ); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}" , messageQueue, msgs); }
执行完消费后,需要对该ProcessQueue进行验证,如果drop为true,将不对结果进行处理。否则,调用processConsumeResult 方法。
1 2 3 4 int ackIndex = context.getAckIndex();if (consumeRequest.getMsgs().isEmpty()) return ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1 ; } int ok = ackIndex + 1 ; int failed = consumeRequest.getMsgs().size() - ok; this .getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this .getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break ; case RECONSUME_LATER: ackIndex = -1 ; this .getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size()); break ; default : break ; }
当消费结果是成功时,计算成功的个数和失败的个数。,更新ackInex。 当消费结果是失败时,将ackIndex重置为-1。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 switch (this .defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1 ; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}" , msg.toString()); } break ; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1 ; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this .sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1 ); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); this .submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break ; default : break ; }
如果是广播模式,消息不会执行任何操作,只是以警告级别输出到日志文件。
如果是集群模式,对发送失败的消息执行操作。 执行sendMessageBack 操作。 然后更新该消息重新消费的次数,并将要重新消费的消息添加到msgBackFailed集合中。 并将需要重新消费的消息从原先的消息集合中移除,然后执行submitConsumeRequestLater 方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private void submitConsumeRequestLater (// final List<MessageExt> msgs, // final ProcessQueue processQueue, // final MessageQueue messageQueue// ) { this .scheduledExecutorService.schedule(new Runnable() { @Override public void run () { ConsumeMessageConcurrentlyService.this .submitConsumeRequest(msgs, processQueue, messageQueue, true ); } }, 5000 , TimeUnit.MILLISECONDS); }
从上面可以看出,它会在5s后重新执行开头介绍的submitConsumeRequest 方法。
1 2 3 4 5 long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this .defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true ); }
从ProcessQueue中移除消费成功的消息集合,返回的偏移量是移除该批消息后最小的偏移量,然后用该偏移量更新消息消费进度,以便在消费者重启后能从上一次的消费进度开始消费,避免消息重复消费。
下面开始介绍
1 2 3 4 5 6 7 8 9 10 11 12 public boolean sendMessageBack (final MessageExt msg, final ConsumeConcurrentlyContext context) { int delayLevel = context.getDelayLevelWhenNextConsume(); try { this .defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName()); return true ; } catch (Exception e) { log.error("sendMessageBack exception, group: " + this .consumerGroup + " msg: " + msg.toString(), e); } return false ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public void sendMessageBack (MessageExt msg, int delayLevel, final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { String brokerAddr = (null != brokerName) ? this .mQClientFactory.findBrokerAddressInPublish(brokerName) : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); this .mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, this .defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000 , getMaxReconsumeTimes()); } catch (Exception e) { log.error("sendMessageBack Exception, " + this .defaultMQPushConsumer.getConsumerGroup(), e); Message newMsg = new Message(MixAll.getRetryTopic(this .defaultMQPushConsumer.getConsumerGroup()), msg.getBody()); String originMsgId = MessageAccessor.getOriginMessageId(msg); MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); newMsg.setFlag(msg.getFlag()); MessageAccessor.setProperties(newMsg, msg.getProperties()); MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic()); MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1 )); MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes())); newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); this .mQClientFactory.getDefaultMQProducer().send(newMsg); } }
MQClientAPIImpl类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public void consumerSendMessageBack ( final String addr, final MessageExt msg, final String consumerGroup, final int delayLevel, final long timeoutMillis, final int maxConsumeRetryTimes ) throws RemotingException, MQBrokerException, InterruptedException { ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader(); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader); requestHeader.setGroup(consumerGroup); requestHeader.setOriginTopic(msg.getTopic()); requestHeader.setOffset(msg.getCommitLogOffset()); requestHeader.setDelayLevel(delayLevel); requestHeader.setOriginMsgId(msg.getMsgId()); requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes); RemotingCommand response = this .remotingClient.invokeSync(MixAll.brokerVIPChannel(this .clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); assert response != null ; switch (response.getCode()) { case ResponseCode.SUCCESS: { return ; } default : break ; } throw new MQBrokerException(response.getCode(), response.getRemark()); }
ACK消息发送的网络客户端入口:MQClientAPIImpl#consumerSendMessageBack,命令编码:RequestCode.CONSUMER_SEND_MSG_BACK
客户端以同步方式发送RequestCode.CONSUMER_SEND_MSG_BACK到服务端,服务端中的类SendMessageProcessor#consumerSendMsgBack会接收处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 final RemotingCommand response = RemotingCommand.createResponseCommand(null ); final ConsumerSendMsgBackRequestHeader requestHeader = (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); if (this .hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) { ConsumeMessageContext context = new ConsumeMessageContext(); context.setConsumerGroup(requestHeader.getGroup()); context.setTopic(requestHeader.getOriginTopic()); context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK); context.setCommercialRcvTimes(1 ); context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER)); this .executeConsumeMessageHookAfter(context); } SubscriptionGroupConfig subscriptionGroupConfig = this .brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup()); if (null == subscriptionGroupConfig) { response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); return response; } if (!PermName.isWriteable(this .brokerController.getBrokerConfig().getBrokerPermission())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the broker[" + this .brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden" ); return response; } if (subscriptionGroupConfig.getRetryQueueNums() <= 0 ) { response.setCode(ResponseCode.SUCCESS); response.setRemark(null ); return response; }
获取消费组的订阅配置消息,如果配置消息未空返回配置组消息不存在错误。如果重试队列数量小于1,则直接返回成功,说明该消费组不支持重试。
1 2 String newTopic = MixAll.getRetryTopic(requestHeader.getGroup()); int queueIdInt = Math.abs(this .random.nextInt() % 99999999 ) % subscriptionGroupConfig.getRetryQueueNums();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 int topicSysFlag = 0 ; if (requestHeader.isUnitMode()) { topicSysFlag = TopicSysFlag.buildSysFlag(false , true ); } TopicConfig topicConfig = this .brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( newTopic, subscriptionGroupConfig.getRetryQueueNums(), PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist" ); return response; } if (!PermName.isWriteable(topicConfig.getPerm())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark(String.format("the topic[%s] sending message is forbidden" , newTopic)); return response; }
当消息需要重新消费时,不会使用原先的消息队列结合,而是重新创建一个重试主题,名称为%RETRY%+消费组名称,并从重试队列中随机选择一个队列,并构建TopicConig主题配置消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 MessageExt msgExt = this .brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset()); if (null == msgExt) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("look message by offset failed, " + requestHeader.getOffset()); return response; } final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); if (null == retryTopic) { MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic()); } msgExt.setWaitStoreMsgOK(false );
根据消息物理偏移量从commitlog文件中获取消息,同时将消息的主题存放到属性中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 int delayLevel = requestHeader.getDelayLevel(); int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); } if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0 ) { newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); queueIdInt = Math.abs(this .random.nextInt() % 99999999 ) % DLQ_NUMS_PER_GROUP; topicConfig = this .brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0 ); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist" ); return response; } }
设置消息重试次数,如果消息已重试次数超过maxReconsumeTimes,再次改变newTopic主题为DLQ(%DLQ%)(死信队列),该主题的权限为只写,说明消息一旦进入了DLQ,该主题的权限为只写,说明消息一旦进入到DLQ队列中,RocketMQ将不负责再次调度进行消费了,需要人工干预。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(newTopic); msgInner.setBody(msgExt.getBody()); msgInner.setFlag(msgExt.getFlag()); MessageAccessor.setProperties(msgInner, msgExt.getProperties()); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null , msgExt.getTags())); msgInner.setQueueId(queueIdInt); msgInner.setSysFlag(msgExt.getSysFlag()); msgInner.setBornTimestamp(msgExt.getBornTimestamp()); msgInner.setBornHost(msgExt.getBornHost()); msgInner.setStoreHost(this .getStoreHost()); msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1 ); String originMsgId = MessageAccessor.getOriginMessageId(msgExt); MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
根据原先的消息创建一个新的消息对象,重试消息会拥有自己的唯一消息ID并存入到commitlog文件中,并不会去更新原先消息,而是会将原来的主题、消息ID存入消息的属性中,主题名称为重试主题,其他属性与原来消息保持一致。
1 PutMessageResult putMessageResult = this .brokerController.getMessageStore().putMessage(msgInner);
将消息存放到Commitlog文件中。
在存入Commitlog文件之前,如果消息的延迟级别delayTimeLevel大于0,替换消息的主题与队列为定时任务主题队列“SCHEDULE_TOPIC_XXXX”,队列ID为延迟级别减1,再次将消息主题、队列存入消息的属性中。
在Broker端存在一个后台服务线程ScheduleMessageService ,其中有一个DeliverDelayedMessageTimeTask定时任务线程,它会根据Topic(“SCHEDULE_TOPIC_XXXX”)与QueueId,先查找逻辑消费队列ConsumeQueue,然后根据偏移量,找到ConsumeQueue中的内存映射对象,从commitlog日志中找到消息对象MessageExt,并做一个消息体的转换,由定时延迟队列消息转化Wie重试队列的消息,再次做持久化磁盘,这是才会真正的保存至重试队列中。定时延迟队列只是为了用于暂存的,然后延迟一段时间后再将消息移入到重试队列中。
RocketMQ会保证消息至少会被消费1次,但是可能会出现消息重复消费问题,需要开发者自己设计解决这一问题。