高并發(fā):RocketMQ 削峰實(shí)戰(zhàn)!
ckground-color: rgb(255, 255, 255);text-align: left;box-sizing: border-box !important;overflow-wrap: break-word !important;">本文來(lái)源:
https://juejin.im/post/5ea159e4f265da47f0794da5
-
Producer:生產(chǎn)發(fā)送消息 -
Broker:存儲(chǔ)Producer發(fā)送過(guò)來(lái)的消息 -
Consumer:從Broker拉取消息并進(jìn)行消費(fèi) -
NameServer:為Producer或Consumer路由到Broker
-
RocketMQ的Consumer獲取消息是通過(guò)向Broker發(fā)送拉取請(qǐng)求獲取的,而不是由Broker發(fā)送Consumer接收的方式。 -
Consumer每次拉取消息時(shí)消息都會(huì)被均勻分發(fā)到消息隊(duì)列再進(jìn)行傳輸,所以RocketMQ中的很多參數(shù)都是針對(duì)隊(duì)列而不是Topic的(這個(gè)是重點(diǎn),順便吐槽下源碼的文檔講的真不清晰,很多都需要自己試錯(cuò),但Dashboard做得很好),其中每個(gè)Broker消息隊(duì)列(ConsumeQueue)的數(shù)量都可以通過(guò)RocketMQ DashBoard實(shí)時(shí)更改調(diào)整。
rocketmq-spring-boot-starter用法簡(jiǎn)介
rocketmq-spring-boot-starter
相關(guān)類:
-
RocketMQListener
接口:消費(fèi)者都需實(shí)現(xiàn)該接口的消費(fèi)方法onMessage(msg)
。 -
RocketMQPushConsumerLifecycleListener
接口:當(dāng)@RocketMQMessageListener
中的配置不足以滿足我們的需求時(shí),可以實(shí)現(xiàn)該接口直接更改消費(fèi)者類DefaultMQPushConsumer
配置 -
@RocketMQMessageListener
:被該注解標(biāo)注并實(shí)現(xiàn)了接口RocketMQListener
的bean為一個(gè)消費(fèi)者并監(jiān)聽(tīng)指定topic隊(duì)列中的消息,該注解中包含消費(fèi)者的一些常用配置(大部分按默認(rèn)即可),一般只需更改consumerGroup(消費(fèi)組)與topic。RocketMQMessageListener
中的屬性配置是可以使用Placeholder(占位符)從配置文件或配置中心獲取的,如下圖:
業(yè)務(wù)案例
環(huán)境配置
文章例子環(huán)境:1NameServer + 2Broker + 1Consumer
添加maven依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
復(fù)制代碼
application.yml配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: praise-group
server:
port: 10000
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: tiger
url: jdbc:mysql://localhost:3306/wilson
swagger:
docket:
base-package: io.rocket.consumer.controller
復(fù)制代碼
點(diǎn)贊接口
PraiseRecord(點(diǎn)贊記錄):
@Data
public class PraiseRecord implements Serializable {
private Long id;
private Long uid;
private Long liveId;
private LocalDateTime createTime;
}
復(fù)制代碼
MessageController(簡(jiǎn)單的測(cè)試接口):
RestController
@RequestMapping("/message")
public class MessageController {
@Resource
private RocketMQTemplate rocketMQTemplate;
@PostMapping("/praise")
public ServerResponse praise(@RequestBody PraiseRecordVO vo) {
rocketMQTemplate.sendOneWay(RocketConstant.Topic.PRAISE_TOPIC, MessageBuilder.withPayload(vo).build());
return ServerResponse.success();
}
// ......
}
復(fù)制代碼
sendOneyWay()
進(jìn)行消息發(fā)送。
RocketMQ的消息發(fā)送方式主要含syncSend()同步發(fā)送、asyncSend()異步發(fā)送、sendOneWay()三種方式,sendOneWay()也是異步發(fā)送,區(qū)別在于不需等待Broker返回確認(rèn),所以可能會(huì)存在信息丟失的狀況,但吞吐量更高,具體需根據(jù)業(yè)務(wù)情況選用。
性能:sendOneWay > asyncSend > syncSend RocketMQTemplate的send()方法默認(rèn)是同步(syncSend)的,更多可看源碼實(shí)現(xiàn)。
PraiseListener:點(diǎn)贊消息消費(fèi)者
@Service
@RocketMQMessageListener(topic = RocketConstant.Topic.PRAISE_TOPIC, consumerGroup = RocketConstant.ConsumerGroup.PRAISE_CONSUMER)
@Slf4j
public class PraiseListener implements RocketMQListener<PraiseRecordVO>, RocketMQPushConsumerLifecycleListener {
@Resource
private PraiseRecordService praiseRecordService;
@Override
public void onMessage(PraiseRecordVO vo) {
praiseRecordService.insert(vo.copyProperties(PraiseRecord::new));
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
// 每次拉取的間隔,單位為毫秒
consumer.setPullInterval(2000);
// 設(shè)置每次從隊(duì)列中拉取的消息數(shù)為16
consumer.setPullBatchSize(16);
}
}
MessageStoreConfig.maxTransferCountOnMessageInMemory
(默認(rèn)為32)值限制,即若想要消費(fèi)者從隊(duì)列拉取的消息數(shù)大于32有效(pullBatchSize>32)則需更改Broker的啟動(dòng)參數(shù)
maxTransferCountOnMessageInMemory
值。在MQ削峰的配置參數(shù)里,以下幾個(gè)
DefaultMQPushConsumer
的參數(shù)是需要注意一下的:
-
pullInterval:每次從Broker拉取消息的間隔,單位為毫秒 -
pullBatchSize:每次從Broker隊(duì)列拉取到的消息數(shù),該參數(shù)很容易讓人誤解,一開(kāi)始我以為是每次拉取的消息總數(shù),但測(cè)試過(guò)幾次后確認(rèn)了實(shí)質(zhì)上是從每個(gè)隊(duì)列的拉取數(shù)(源碼上的注釋文檔真的很差,跟沒(méi)有一樣),即Consume每次拉取的消息總數(shù)如下: EachPullTotal=所有Broker上的寫(xiě)隊(duì)列數(shù)和(writeQueueNums=readQueueNums) * pullBatchSize
-
consumeMessageBatchMaxSize:每次消費(fèi)(即將多條消息合并為L(zhǎng)ist消費(fèi))的最大消息數(shù)目,默認(rèn)值為1,rocketmq-spring-boot-starter 目前不支持批量消費(fèi)(2.1.0版本)
上線了但消費(fèi)效率預(yù)估失誤如何動(dòng)態(tài)更改消費(fèi)效率 ?
如何使用RocketMQ批量消費(fèi) ?
DefaultMQPushConsumer
并配置其
consumeMessageBatchMaxSize
屬性。
consumeMessageBatchMaxSize
屬性默認(rèn)值為1,即每次只消費(fèi)一條消息,需要注意的是該屬性也會(huì)受
pullBatchSize
影響,如果
consumeMessageBatchMaxSize
為32但
pullBatchSize
只為12,那么每次批量消費(fèi)的最大消息數(shù)也就只有12。如下為個(gè)人測(cè)試批量消費(fèi)Consumer的測(cè)試bean:
@Bean
public DefaultMQPushConsumer userMQPushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketConstant.ConsumerGroup.SPRING_BOOT_USER_CONSUMER);
consumer.setNamesrvAddr(nameServer);
consumer.subscribe(RocketConstant.Topic.SPRING_BOOT_USER_TOPIC, "*");
// 設(shè)置每次消息拉取的時(shí)間間隔,單位毫秒
consumer.setPullInterval(1000);
// 設(shè)置每個(gè)隊(duì)列每次拉取的最大消息數(shù)
consumer.setPullBatchSize(24);
// 設(shè)置消費(fèi)者單次批量消費(fèi)的消息數(shù)目上限
consumer.setConsumeMessageBatchMaxSize(12);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context)
-> {
List<UserInfo> userInfos = new ArrayList<>(msgs.size());
Map<Integer, Integer> queueMsgMap = new HashMap<>(8);
msgs.forEach(msg -> {
userInfos.add(JSONObject.parseObject(msg.getBody(), UserInfo.class));
queueMsgMap.compute(msg.getQueueId(), (key, val) -> val == null ? 1 : ++val);
});
log.info("userInfo size: {}, content: {}", userInfos.size(), userInfos);
/*
處理批量消息,如批量插入:userInfoMapper.insertBatch(userInfos);
*/
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
return consumer;
}
consumeMessageBatchMaxSize
與
pullBatchSize
,且
pullBatchSize
較小,所以每次消費(fèi)的消息數(shù)最大值為12,如下圖:
附本文相關(guān)信息
-
確保mqnamesrv與mqbroker已啟動(dòng)成功,如該文章環(huán)境的啟動(dòng): mqnamesrv -n 127.0.0.1:9876
mqbroker -c E:\RocketMQ\rocketmq-all-4.5.2-bin-release\bin\2m-noslave\broker-a.properties
mqbroker -c E:\RocketMQ\rocketmq-all-4.5.2-bin-release\bin\2m-noslave\broker-b.properties
-
RocketMQ DashBoard啟動(dòng)流程可參考官方github文檔或到我的資源里下載jar包運(yùn)行 -
源碼地址(https://github.com/Wilson-He/spring-boot-series/tree/master/spring-rocketmq),2m-noslave目錄是該文章中例子中的2master broker配置與啟動(dòng)腳本,spring-boot-consumer-peak目錄為包含該文章相關(guān)代碼的實(shí)際例子
特別推薦一個(gè)分享架構(gòu)+算法的優(yōu)質(zhì)內(nèi)容,還沒(méi)關(guān)注的小伙伴,可以長(zhǎng)按關(guān)注一下:
長(zhǎng)按訂閱更多精彩▼
如有收獲,點(diǎn)個(gè)在看,誠(chéng)摯感謝
免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺(tái)僅提供信息存儲(chǔ)服務(wù)。文章僅代表作者個(gè)人觀點(diǎn),不代表本平臺(tái)立場(chǎng),如有問(wèn)題,請(qǐng)聯(lián)系我們,謝謝!
ckquote>