Rocketmq源碼分析12:consumer 負(fù)載均衡
注:本系列源碼分析基于RocketMq 4.8.0,gitee倉(cāng)庫(kù)鏈接:https://gitee.com/funcy/rocketmq.git.
接上文,繼續(xù)分析consumer消費(fèi)流程。
5. 如何選擇消息隊(duì)列:RebalanceService
讓我們回到PullMessageService#run()方法:
public class PullMessageService extends ServiceThread {
...
private final LinkedBlockingQueue<PullRequest> pullRequestQueue
= new LinkedBlockingQueue<PullRequest>();
/**
* 將 pullRequest 放入 pullRequestQueue 中
*/
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 從 pullRequestQueue 獲取一個(gè) pullRequest,阻塞的方式
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
...
}
PullMessageService線程獲得了pullRequest后,然后就開始了一次又一次的拉起消息的操作,那這個(gè)pullRequest最初是在哪里添加進(jìn)來(lái)的呢?這就是本節(jié)要分析的「負(fù)載均衡」功能了。
處理負(fù)載均衡的線程為RebalanceService,它是在MQClientInstance#start方法中啟動(dòng)的,我們直接進(jìn)入其run()方法:
public class RebalanceService extends ServiceThread {
// 省略其他
...
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
}
在它的run()方法中,僅是調(diào)用了MQClientInstance#doRebalance方法,我們繼續(xù)進(jìn)入:
public void doRebalance() {
// consumerTable 存放的就是當(dāng)前 consumer
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
在MQClientInstance#doRebalance方法中,會(huì)遍歷所有的consumer,然后調(diào)用DefaultMQPushConsumerImpl#doRebalance方法作進(jìn)一步的處理,consumerTable就是用來(lái)保存DefaultMQPushConsumerImpl實(shí)例的,繼續(xù)進(jìn)入DefaultMQPushConsumerImpl#doRebalance方法:
@Override
public void doRebalance() {
if (!this.pause) {
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
繼續(xù)跟進(jìn),來(lái)到RebalanceImpl#doRebalance方法:
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
// 客戶端負(fù)載均衡:根據(jù)主題來(lái)處理負(fù)載均衡
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
/**
* 這就是最張?zhí)幚碡?fù)載均衡的地方了
*/
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
// 廣播模式:不需要處理負(fù)載均衡,每個(gè)消費(fèi)者都要消費(fèi),只需要更新負(fù)載信息
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
// 更新負(fù)載均衡信息,這里傳入的參數(shù)是mqSet,即所有隊(duì)列
boolean changed = this
.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info(...);
}
} else {
log.warn(...);
}
break;
}
// 集群模式
case CLUSTERING: {
// 根據(jù)訂閱的主題獲取消息隊(duì)列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 客戶端id,根據(jù) topic 與 consumerGroup 獲取所有的 consumerId
List<String> cidAll = this.mQClientFactory
.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn(...);
}
}
if (null == cidAll) {
log.warn(...);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
// 排序后才能保證消費(fèi)者負(fù)載策略相對(duì)穩(wěn)定
Collections.sort(mqAll);
Collections.sort(cidAll);
// MessageQueue 的負(fù)載策略
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// 按負(fù)載策略進(jìn)行分配,返回當(dāng)前消費(fèi)者實(shí)際訂閱的messageQueue集合
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error(...);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
// 更新負(fù)載均衡信息,傳入?yún)?shù)是 allocateResultSet,即當(dāng)前consumer分配到的隊(duì)列
boolean changed = this
.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(...);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
RebalanceImpl#rebalanceByTopic方法就是最終處理負(fù)載均衡的方法了,在這個(gè)方法里會(huì)區(qū)分廣播模式與集群模式的處理。
在廣播模式下,一條消息會(huì)被同一個(gè)消費(fèi)組中的所有consumer消費(fèi),而集群模式下,一條消息只會(huì)被同一個(gè)消費(fèi)組下的一個(gè)consumer消費(fèi)。
正是因?yàn)槿绱耍瑥V播模式下并沒有負(fù)載均衡可言,直接把所有的隊(duì)列都分配給當(dāng)前consumer處理,然后更新QueueTable的負(fù)載均衡信息;而集群模式會(huì)先分配當(dāng)前consumer消費(fèi)的消息隊(duì)列,再更新QueueTable的負(fù)載均衡信息。
這里我們來(lái)看看集群模式,看看它的操作:
strategy.allocate(...):按負(fù)載均衡策略為當(dāng)前consumer分配隊(duì)列updateProcessQueueTableInRebalance(...):更新負(fù)載均衡信息。
在rocketMq中,提供了這些負(fù)載均衡策略:
AllocateMessageQueueAveragely:平均負(fù)載策略,rocketMq默認(rèn)使用的策略AllocateMessageQueueAveragelyByCircle:環(huán)形平均分配,這個(gè)和平均分配唯一的區(qū)別就是,再分隊(duì)列的時(shí)候,平均隊(duì)列是將屬于自己的MessageQueue全部拿走,而環(huán)形平均則是,一人拿一個(gè),拿到的Queue不是連續(xù)的。AllocateMessageQueueByConfig:用戶自定義配置AllocateMessageQueueByMachineRoom:同機(jī)房負(fù)載策略,這個(gè)策略就是當(dāng)前Consumer只負(fù)載處在指定的機(jī)房?jī)?nèi)的MessageQueue,brokerName的命名必須要按要求的格式來(lái)設(shè)置:機(jī)房名@brokerNameAllocateMachineRoomNearby:就近機(jī)房負(fù)載策略,在AllocateMessageQueueByMachineRoom策略中,如果同一機(jī)房中只有MessageQueue而沒有consumer,那這個(gè)MessageQueue上的消息該如何消費(fèi)呢?AllocateMachineRoomNearby就是擴(kuò)充了該功能的處理AllocateMessageQueueConsistentHash:一致性哈希策略
這里我們重點(diǎn)來(lái)分析平均負(fù)載策略AllocateMessageQueueAveragely:
public List<MessageQueue> allocate(String consumerGroup, String currentCID,
List<MessageQueue> mqAll, List<String> cidAll) {
// 返回值
List<MessageQueue> result = new ArrayList<MessageQueue>();
// 省略一些判斷操作
...
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
// 1. 消費(fèi)者數(shù)量大于隊(duì)列數(shù)量:averageSize = 1
// 2. 消費(fèi)者數(shù)量小于等于隊(duì)列數(shù)量:averageSize = 隊(duì)列數(shù)量 / 消費(fèi)者數(shù)量,還要處理個(gè)+1的操作
int averageSize = mqAll.size() <= cidAll.size()
? 1 : (mod > 0 && index < mod
? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
這個(gè)方法中,關(guān)鍵的分配方法就在后面幾行,如果只看代碼,會(huì)感覺有點(diǎn)暈,這里我舉一個(gè)例子來(lái)簡(jiǎn)單解釋下:
假設(shè):messageQueue一共有6個(gè),consumer有4個(gè),當(dāng)前consumer的index為1,有了這些前提后,接下來(lái)我們就來(lái)看它的分配過程了。
計(jì)算取余操作:
6 % 4 = 2,這表明messageQueue不能平均分配給每個(gè)consumer,接下來(lái)就來(lái)看看這個(gè)余數(shù)2是如何處理的計(jì)算每個(gè)
consumer平均處理的messageQueue數(shù)量消費(fèi)者索引 0 1 2 3 處理數(shù)量 2 2 1 1 這里需要注意,如果 consumer數(shù)量大于messageQueue數(shù)量,那每個(gè)consumer最多只會(huì)分配到一個(gè)messageQueue,這種情況下,余數(shù)2不會(huì)進(jìn)行處理,并且有的consumer處理的messageQueue數(shù)量為0,同一個(gè)messageQueue不會(huì)同時(shí)被兩個(gè)及以上的consumer消費(fèi)掉這里的 messageQueue數(shù)量為6,consumer為4,計(jì)算得到每個(gè)consumer處理的隊(duì)列數(shù)最少為1,除此之外,為了實(shí)現(xiàn)“平均”,有2個(gè)consumer會(huì)需要多處理1個(gè)messageQueue,按“平均”的分配原則,如果index小于mod,則會(huì)分配多1個(gè)messageQueue,這里的mod為2,結(jié)果如下:分配完每個(gè)
consumer處理的messageQueue數(shù)量后,這些messageQueue該如何分配呢?從代碼來(lái)看,分配時(shí)會(huì)先分配完一個(gè)consumer,再分配下一個(gè)consumer,最終結(jié)果就是這樣:隊(duì)列 Q0 Q1 Q2 Q3 Q4 Q5 消費(fèi)者 C1 C1 C2 C2 C4 C5
從圖中可以看到,在6個(gè)messageQueue、4個(gè)consumer、當(dāng)前consumer的index為1的情況下,當(dāng)前consumer會(huì)分到2個(gè)隊(duì)列,分別為Q2/Q3.
將messageQueue分配完成后,接下來(lái)就是更新負(fù)載信息了,方法為RebalanceImpl#updateProcessQueueTableInRebalance:
private boolean updateProcessQueueTableInRebalance(final String topic,
final Set<MessageQueue> mqSet, final boolean isOrder) {
...
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn(...);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
}
// pullRequest 最初產(chǎn)生的地方:mq 不存在,就添加
else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
// 添加 pullRequest
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 發(fā)布
this.dispatchPullRequest(pullRequestList);
return changed;
}
這個(gè)方法中最最關(guān)鍵的就是pullRequestList的添加操作了:先遍歷傳入的MessageQueue,如果當(dāng)前consumer沒有消費(fèi)過該messageQueue,則添加一個(gè)新的pullRequest到pullRequestList,之后就是發(fā)布pullRequestList了。
看到這里,我們就應(yīng)該能明白,最初的pullRequest就是在這里產(chǎn)生的,而發(fā)布pullRequestList的操作,就是將pullRequest丟給pullMessageService線程處理了:
/**
* RebalancePushImpl#dispatchPullRequest:發(fā)布pullRequest的操作
*/
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
// 在這里執(zhí)行pullRequest,其實(shí)就是把 pullRequest 添加到
// PullMessageService#pullRequestQueue 中
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
限于篇幅,本文就先到這里了,下篇繼續(xù)。
限于作者個(gè)人水平,文中難免有錯(cuò)誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。
本文首發(fā)于微信公眾號(hào) 「Java技術(shù)探秘」,如果您喜歡本文,歡迎關(guān)注該公眾號(hào),讓我們一起在技術(shù)的世界里探秘吧!
