消息消费,简而言之就是消费者从消息队列里读取数据。消费者有两种消费方式:
Push方式 。消息服务器接收到信息后,主动把消息推送给消费者,实时性高。但是这样加大了消息服务器的工作压力,会影响其性能。除此之外,不同消费者的处理信息的能力不同,可能无法及时的消费消息,造成 慢消费 问题。相关类是DefaultMQPushConsumer
。
Pull方式 。消费者主动向消息服务器拉取消息,主动权在消费者这里。主要的问题是循环拉取消息的间隔不好设定,设置的间隔时间太久会增加消息的延迟;设置的事件间隔太短,如果消费服务器里没有可用的消息,那么会造成很多无用的请求开销,影响其性能。相关类是DefaultMQPullConsumer
。
消息消费以组的模式开展,一个消费组里可以包含多个消费者,每一个消费组可以订阅多个主题,消费组之间有集群模式 和广播模式 两种消费模式。
集群模式: 同一个ConsumerGroup里的每个Consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里的所有的Consumer消费的内容合起来才是订阅的Topic内容的整体,从而达到负载均衡的目的。
广播模式 同一个ConsumerGroup里的每个Consumer都能消费到所订阅Topic的全部信息,也就是一个消息会被多次分发,被多个Consumer消费。
首先先将一下DefaultMQPushConsumer的相关操作。
DefaultMQPushConsumer的启动 启动方法是在DefaultMQPushConsumerImpl.start()方法。
首先会根据服务状态选择策略。定义的状态如下所示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public enum ServiceState { CREATE_JUST, RUNNING, SHUTDOWN_ALREADY, START_FAILED; }
1 this .serviceState = ServiceState.START_FAILED;
如果是RUNNING、 START_FAILED ,则跳过该环节,直接进行下一环节。如果是SULTdOWN_ALREDAY ,则抛出异常。如果是CREATE_JUST ,则需要进入执行该环节的代码。 进入里面的区域时,先预设serviceState的值为START_FAILE,在执行一段操作后,如果注册消费者没有成功,则修改serviceState为CREATE_JUST,并抛出异常;如果顺利执行则修改serviceState为RUNNING。
1 2 3 this .checkConfig();this .copySubscription();
订阅主题订阅消息SubscriptionData,并放入到RebalanceImpl的订阅消息中。订阅关系来源主要有两个。
defaultMQPushConsumer.getSubscription()
订阅重试主题消息。RocketMQ消息重试是以消费组为单位,而不是主题,消息重试主题为%RETRY%+消费组名。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private void copySubscription () throws MQClientException { try { Map<String, String> sub = this .defaultMQPushConsumer.getSubscription(); if (sub != null ) { for (final Map.Entry<String, String> entry : sub.entrySet()) { final String topic = entry.getKey(); final String subString = entry.getValue(); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this .defaultMQPushConsumer.getConsumerGroup(), topic, subString); this .rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } } if (null == this .messageListenerInner) { this .messageListenerInner = this .defaultMQPushConsumer.getMessageListener(); } switch (this .defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: break ; case CLUSTERING: final String retryTopic = MixAll.getRetryTopic(this .defaultMQPushConsumer.getConsumerGroup()); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this .defaultMQPushConsumer.getConsumerGroup(), retryTopic, SubscriptionData.SUB_ALL); this .rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break ; default : break ; } } catch (Exception e) { throw new MQClientException("subscription exception" , e); } }
如果当前是集群消费模式,修改实例名为Pid。
1 2 3 if (this .defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this .defaultMQPushConsumer.changeInstanceNameToPID(); }
初始化MQClientInstance、ReblanceImple(消息重新负载实现类)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 this .mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this .defaultMQPushConsumer, this .rpcHook); this .rebalanceImpl.setConsumerGroup(this .defaultMQPushConsumer.getConsumerGroup()); this .rebalanceImpl.setMessageModel(this .defaultMQPushConsumer.getMessageModel()); this .rebalanceImpl.setAllocateMessageQueueStrategy(this .defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this .rebalanceImpl.setmQClientFactory(this .mQClientFactory); this .pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this .pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
处理offset存储方式。 如果消息消费是集群模式,那么消息进度保存在Broker上;如果是广播模式,那么消息消费进度存储在消费端。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 if (this .defaultMQPushConsumer.getOffsetStore() != null ) { this .offsetStore = this .defaultMQPushConsumer.getOffsetStore(); } else { switch (this .defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this .offsetStore = new LocalFileOffsetStore(this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup()); break ; case CLUSTERING: this .offsetStore = new RemoteBrokerOffsetStore(this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup()); break ; default : break ; } } this .offsetStore.load();
根据MessageListener的具体实现方式选择具体的消息拉取线程实现 可以选择顺序消息消费服务或者并行消息消费服务 最后执行ConsumerMessageService主要负责消费消息,内部维护一个线程池。
1 2 3 4 5 6 7 8 9 10 11 if (this .getMessageListenerInner() instanceof MessageListenerOrderly) { this .consumeOrderly = true ; this .consumeMessageService = new ConsumeMessageOrderlyService(this , (MessageListenerOrderly) this .getMessageListenerInner()); } else if (this .getMessageListenerInner() instanceof MessageListenerConcurrently) { this .consumeOrderly = false ; this .consumeMessageService = new ConsumeMessageConcurrentlyService(this , (MessageListenerConcurrently) this .getMessageListenerInner()); } this .consumeMessageService.start();
向MQClientInstance注册消费者,并启动MQClientInstance,在一个JVM中的所有消费者、生产者持有同一个MQClientInstance,MQClientInstance只会启动一次。
1 2 3 4 5 6 7 8 9 10 boolean registerOK = mQClientFactory.registerConsumer(this .defaultMQPushConsumer.getConsumerGroup(), this ); if (!registerOK) { this .serviceState = ServiceState.CREATE_JUST; this .consumeMessageService.shutdown(); throw new MQClientException("The consumer group[" + this .defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null ); } mQClientFactory.start();
订阅关系改变,更新NameServer的订阅关系表。 检查客户端状态 发送心跳条 唤醒执行消费者负载均衡。
1 2 3 4 5 this .updateTopicSubscribeInfoWhenSubscriptionChanged();this .mQClientFactory.checkClientInBroker(); this .mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this .mQClientFactory.rebalanceImmediately();
在上面提到了offset的存储问题。现在先讲一下什么是offset和存储规则。
消息消费进度记录 消息消费者在消费一批消息后,需要记录该批消息已经消费完毕,否则当消费者重新启动时又得从消息消费队列的开始消费,这样显然会产生问题。一次消息消费后会从ProcessQueue处理队列中移除该批消息,返回ProcessQueue最小偏移量,并存入到消息进度表中。该消息进度表的存储位置和机制是一个重要的问题。
由上面可知,有两种消费模式,广播模式 和 集群模式
广播模式: 同一消费组的所有消费者都会消费该主题下的所有消息。即同一个消息会被所有消费者消费,所以每个消费者应该各自独立有一个记录消费进度的文件。 广播模式下消息进度存储在消费者本地,主要类是LocalFileOffsetStore.
集群模式: 同一消费者的所有消费者共同消费该主题下的所有消息,一个消息只能被一个消费者所消费,即每个消费者消费的是该消费主题下的部分消息,所以消息消费进度记录被所有消费者所共享。 集群模式消息进度存储文件存放在消息服务端Broker,主要类中RemoteBrokerOffsetStore。
几个重要有关消息消费的类 每一个PullRequest代表一个消费的分组单元 PullRequest会记录一个topic记录对应的consumerGroup的拉取进度。
PullRequest 1 2 3 4 5 6 7 8 9 10 public class PullRequest { private String consumerGroup; # 待拉取消息队列 private MessageQueue messageQueue; # 消息处理队列,从Broker拉取到的消费先存入到ProcessQueue,然后提交到消费者消费线程池消费 private ProcessQueue processQueue; # 待拉取的MessageQueue偏移量 private long nextOffset; # 是否被锁定 private boolean lockedFirst = false ;
ProcessQueue是MessageQueue在消费端的重现、快照。
ProcessQueue 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class ProcessQueue { public final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime" , "30000" )); public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval" , "20000" )); private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime" , "120000" )); private final Logger log = ClientLogger.getLog(); private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock(); # 消息存储容器,键为消息在ConsumeQueue中的偏移量,value为消息实体 private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>(); # ProcessQueue中的消息总数 private final AtomicLong msgCount = new AtomicLong(); private final Lock lockConsume = new ReentrantLock(); # 消息临时存储容器,消息消费线程从ProcessQueue的msgTreeMap中取出消息前,先将消息临时存储在msgTreeMapTemp中。 private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>(); private final AtomicLong tryUnlockTimes = new AtomicLong(0 ); private volatile long queueOffsetMax = 0L ; # 当前ProcessQueue是否被丢弃 private volatile boolean dropped = false ; # 上一次开始消息拉取时间戳 private volatile long lastPullTimestamp = System.currentTimeMillis(); # 上一次消息消费时间戳 private volatile long lastConsumeTimestamp = System.currentTimeMillis(); private volatile boolean locked = false ; private volatile long lastLockTimestamp = System.currentTimeMillis(); private volatile boolean consuming = false ; private volatile long msgAccCnt = 0 ;
DefaultMQPushConsumer 客户端消费者实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; # 消费者组名 private String consumerGroup; # 消费模式 private MessageModel messageModel = MessageModel.CLUSTERING; # 消费者从哪个位置消费 # CONSUME_FROM_LAST_OFFSET: 第一次启动从队列最后位置消费,后续再接着上次消费的进度开始消费 # CONSUME_FROM_First_OFFSET: 第一次启动从队列开始位置消费,后续再接着上次消费的进度开始消费 # CONSUME_FROM_TimeStamp: 第一次启动从指定时间点位置消费,后续再接着上次消费的进度开始消费 # 这里的第一次启动指的是该消费者之前没有消费过该消息队列,如果消费过,则会在Broker端记录消费位置,如果该消费者挂了再启动时,会自动从上次消费的地方开始。 private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30 )); # 消费分配策略,默认消息平均分配给所有客户端 private AllocateMessageQueueStrategy allocateMessageQueueStrategy; # topic对应的订阅tag private Map<String , String > subscription = new HashMap<String, String>(); # 客户端消费消息的实现类 private MessageListener messageListener; # 存储实现,本地存储或者Broker存储 private OffsetStore offsetStore; # Minimum consumer thread number private int consumeThreadMin = 20 ; # Max consumer thread number private int consumeThreadMax = 64 ; # Threshold for dynamic adjustment of the number of thread pool private long adjustThreadPoolNumsThreshold = 100000 ; # 单队列并行消费的最大跨度,用于流量控制 private int consumeConcurrentlyMaxSpan = 2000 ; # 一个queue最大消费的消息个数,用于流量控制 private int pullThresholdForQueue = 1000 ; # 消息拉取时间间隔,默认为0, private long pullInterval = 0 ; # 并发消费时,一次消费消息的数量 private int consumeMessageBatchMaxSize = 1 ; # 消息拉取一次的数量 private int pullBatchSize = 32 ; # Whether update subscription relationship when every pull private boolean postSubscriptionWhenPull = false ; # Whether the unit of subscription group private boolean unitMode = false ; private int maxReconsumeTimes = -1 ; private long suspendCurrentQueueTimeMillis = 1000 ; # Maximum amount of time in minutes a message may block the consuming thread. private long consumeTimeout = 15 ;
DefaultMQPushConsumerImpl 消费者具体实现类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class DefaultMQPushConsumerImpl implements MQConsumerInner { private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000 ; private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50 ; private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000 ; private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15 ; private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30 ; private final Logger log = ClientLogger.getLog(); private final DefaultMQPushConsumer defaultMQPushConsumer; private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this ); private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>(); private final long consumerStartTimestamp = System.currentTimeMillis(); private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>(); private final RPCHook rpcHook; private volatile ServiceState serviceState = ServiceState.CREATE_JUST; private MQClientInstance mQClientFactory; private PullAPIWrapper pullAPIWrapper; private volatile boolean pause = false ; private boolean consumeOrderly = false ; private MessageListener messageListenerInner; private OffsetStore offsetStore; private ConsumeMessageService consumeMessageService; private long flowControlTimes1 = 0 ; private long flowControlTimes2 = 0 ;
消息拉取基本流程 具体的实现类是DefaultMQPushConsumerImpl 消息拉取主要有3个主要步骤:
消费拉取客户端消息拉取请求封装
消息服务器查找并返回消息
消息拉取客户端处理返回的消息
客户端封装消息拉取请求
1.1 从PullRequest中获取ProcessQueue,如果处理队列当前状态未被丢弃,则更新ProcessQueue的lastPullTimestamp为当前时间戳;如果当前消费者被挂起,则将拉取任务延迟1s再次放入到PullMessageService的拉取任务中,结束本次消息拉取。
1.2 进行消息拉取流量控制
PushConsumer有个线程池,消息处理逻辑在各个线程里同时执行,在PushConsumer运行的时候,每个Message Queue都有一个对用的ProcessQueue对象,保存了这个Message Queue 消息处理状态的快照。
ProcessQueue对象里主要内容是一个TreeMap和一个读写锁。TreeMap里以Message Queue的Offset作为Key,以消息内容的引用为Value,保存了所有从MessageQueue获取的,但是还未被处理的信息,读写锁控制着多个线程对TreeMap对象的并发处理。
流量控制策略:
消息处理总数,如果ProcessQueue当前处理的消息超过了pullThresholdForQueue=1000将触发流量控制,放弃本次拉取任务,并且该队列的下一次拉取任务将在50毫秒后才加入到拉取任务队列中。
ProcessQueue中队列最大偏移量与最小偏移量的间距,不能超过consumeConcurrencyMaxSpan,否则触发流量控制。
1.3 拉取该主题订阅信息,如果为空,结束本次消息拉取,关于该队列的下一次拉取任务延迟3秒。 1.4 构建消息拉取系统标记。 1.5 调用PullAPIWrapper.pullKernelImpl方法后与服务端交互。 1.6 根据brokerName、BrokerId从MQClientInstance中获取Broker地址,在整个RocketMQ Broker的部署结构中,相同名称的Broker构成主从结构,其BrokerId会不一样,在每次拉取消息后,会给出一个建议,下次拉取从主节点还是从节点拉取。
然后是消息服务端Broker组装消息。会根据处理的结果返回不同的状态编码。 主要有下面几种状态编码。
SUCCESS : 成功
PULL_RETRY_IMMEDIATElY : 立即重试
PULL_OFFSET_MOVED : 偏移量移动
PULL_NOT_FOUND : 未找到消息
消息拉取客户端处理消息。
先分析状态编码为SUCCESS的后续处理步骤。
更新PullRequest的下一次拉取偏移量,如果msgFoundList为空,则立即将PullRequest放入到PullMessageService的pullRequestQueue,以便PullMessageService能及时唤醒并再次执行消息拉取。
将拉取到的消息存放到ProcessQueue,然后将拉取到的消息提交到ConsumeMessageService中供消费者消费。
将消费提交给消费者线程之后PullCallBack将立即返回,可以说本次消息拉取顺利完成,然后根据pullInterval参数,如果pullInterval > 0,则等待pullInterval毫秒后将PullRequest对象放入到PullMessageService的pullRequestQueue中,该消息队列的下次拉取即将被激活,达到持续消息拉取,实现准实时拉取消息的效果。
如果拉取结果为偏移量非法,首先将ProcessQueue设置dropped为ture,表示丢弃该消息队列,意味着ProcessQueue中拉取的消息将停止消费,然后根据服务端下一次校对的偏移量尝试更新消息消费进度,然后尝试持久化消息消费进度,并将该消息队列从RebalanceImpl的处理队列中移除,意味着暂停该消息队列的消息拉取,等待下一次消息队列重新加载。
这篇对消息拉取的笔记就暂时写到这里,下一篇阿静详细介绍关于消息拉取失败后的长轮询方法。