springboot + rabbitmq 用了消息確認(rèn)機制,感覺掉坑里了
最近部門號召大伙多組織一些技術(shù)分享會,說是要活躍公司的技術(shù)氛圍,但早就看穿一切的我知道,這 T M 就是為了刷KPI
。不過,話說回來這的確是件好事,與其開那些沒味的扯皮會,多做技術(shù)交流還是很有助于個人成長的。
于是乎我主動報名參加了分享,咳咳咳~ ,真的不是為了那點KPI
,就是想和大伙一起學(xué)習(xí)學(xué)習(xí)!
這次我分享的是 springboot
+ rabbitmq
如何實現(xiàn)消息確認(rèn)機制,以及在實際開發(fā)中的一點踩坑經(jīng)驗,其實整體的內(nèi)容比較簡單,有時候事情就是這么神奇,越是簡單的東西就越容易出錯。
可以看到使用了 RabbitMQ
以后,我們的業(yè)務(wù)鏈路明顯變長了,雖然做到了系統(tǒng)間的解耦,但可能造成消息丟失的場景也增加了。例如:
-
消息生產(chǎn)者 - > rabbitmq服務(wù)器(消息發(fā)送失?。?/p>
-
rabbitmq服務(wù)器自身故障導(dǎo)致消息丟失
-
消息消費者 - > rabbitmq服務(wù)(消費消息失?。?/p>
所以說能不使用中間件就盡量不要用,如果為了用而用只會徒增煩惱。開啟消息確認(rèn)機制以后,盡管很大程度上保證了消息的準(zhǔn)確送達(dá),但由于頻繁的確認(rèn)交互,rabbitmq
整體效率變低,吞吐量下降嚴(yán)重,不是非常重要的消息真心不建議你用消息確認(rèn)機制。
下邊我們先來實現(xiàn)springboot
+ rabbitmq
消息確認(rèn)機制,再對遇到的問題做具體分析。
一、準(zhǔn)備環(huán)境
1、引入 rabbitmq 依賴包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、修改 application.properties 配置
配置中需要開啟 發(fā)送端
和 消費端
的消息確認(rèn)。
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 發(fā)送者開啟 confirm 確認(rèn)機制
spring.rabbitmq.publisher-confirms=true
# 發(fā)送者開啟 return 確認(rèn)機制
spring.rabbitmq.publisher-returns=true
####################################################
# 設(shè)置消費端手動 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重試
spring.rabbitmq.listener.simple.retry.enabled=true
3、定義 Exchange 和 Queue
定義交換機 confirmTestExchange
和隊列 confirm_test_queue
,并將隊列綁定在交換機上。
@Configuration
public class QueueConfig {
@Bean(name = "confirmTestQueue")
public Queue confirmTestQueue() {
return new Queue("confirm_test_queue", true, false, false);
}
@Bean(name = "confirmTestExchange")
public FanoutExchange confirmTestExchange() {
return new FanoutExchange("confirmTestExchange");
}
@Bean
public Binding confirmTestFanoutExchangeAndQueue(
@Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
@Qualifier("confirmTestQueue") Queue confirmTestQueue) {
return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
}
}
rabbitmq
的消息確認(rèn)分為兩部分:發(fā)送消息確認(rèn) 和 消息接收確認(rèn)。
二、消息發(fā)送確認(rèn)
發(fā)送消息確認(rèn):用來確認(rèn)生產(chǎn)者 producer
將消息發(fā)送到 broker
,broker
上的交換機 exchange
再投遞給隊列 queue
的過程中,消息是否成功投遞。
消息從 producer
到 rabbitmq broker
有一個 confirmCallback
確認(rèn)模式。
消息從 exchange
到 queue
投遞失敗有一個 returnCallback
退回模式。
我們可以利用這兩個Callback
來確保消的100%送達(dá)。
1、 ConfirmCallback確認(rèn)模式
消息只要被 rabbitmq broker
接收到就會觸發(fā) confirmCallback
回調(diào) 。
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("消息發(fā)送異常!");
} else {
log.info("發(fā)送者爸爸已經(jīng)收到確認(rèn),correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
}
}
}
實現(xiàn)接口 ConfirmCallback
,重寫其confirm()
方法,方法內(nèi)有三個參數(shù)correlationData
、ack
、cause
。
-
correlationData
:對象內(nèi)部只有一個id
屬性,用來表示當(dāng)前消息的唯一性。 -
ack
:消息投遞到broker
的狀態(tài),true
表示成功。 -
cause
:表示投遞失敗的原因。
但消息被 broker
接收到只能表示已經(jīng)到達(dá) MQ服務(wù)器,并不能保證消息一定會被投遞到目標(biāo) queue
里。所以接下來需要用到 returnCallback
。
2、 ReturnCallback 退回模式
如果消息未能投遞到目標(biāo) queue
里將觸發(fā)回調(diào) returnCallback
,一旦向 queue
投遞消息未成功,這里一般會記錄下當(dāng)前消息的詳細(xì)投遞數(shù)據(jù),方便后續(xù)做重發(fā)或者補償?shù)炔僮鳌?/p>
@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
}
}
實現(xiàn)接口ReturnCallback
,重寫 returnedMessage()
方法,方法有五個參數(shù)message
(消息體)、replyCode
(響應(yīng)code)、replyText
(響應(yīng)內(nèi)容)、exchange
(交換機)、routingKey
(隊列)。
下邊是具體的消息發(fā)送,在rabbitTemplate
中設(shè)置 Confirm
和 Return
回調(diào),我們通過setDeliveryMode()
對消息做持久化處理,為了后續(xù)測試創(chuàng)建一個 CorrelationData
對象,添加一個id
為10000000000
。
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ConfirmCallbackService confirmCallbackService;
@Autowired
private ReturnCallbackService returnCallbackService;
public void sendMessage(String exchange, String routingKey, Object msg) {
/**
* 確保消息發(fā)送失敗后可以重新返回到隊列中
* 注意:yml需要配置 publisher-returns: true
*/
rabbitTemplate.setMandatory(true);
/**
* 消費者確認(rèn)收到消息后,手動ack回執(zhí)回調(diào)處理
*/
rabbitTemplate.setConfirmCallback(confirmCallbackService);
/**
* 消息投遞到隊列失敗回調(diào)處理
*/
rabbitTemplate.setReturnCallback(returnCallbackService);
/**
* 發(fā)送消息
*/
rabbitTemplate.convertAndSend(exchange, routingKey, msg,
message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
},
new CorrelationData(UUID.randomUUID().toString()));
}
三、消息接收確認(rèn)
消息接收確認(rèn)要比消息發(fā)送確認(rèn)簡單一點,因為只有一個消息回執(zhí)(ack
)的過程。使用@RabbitHandler
注解標(biāo)注的方法要增加 channel
(信道)、message
兩個參數(shù)。
@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {
@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) throws IOException {
try {
log.info("小富收到消息:{}", msg);
//TODO 具體業(yè)務(wù)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重復(fù)處理失敗,拒絕再次接收...");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息
} else {
log.error("消息即將再次返回隊列處理...");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
消費消息有三種回執(zhí)方法,我們來分析一下每種方法的含義。
1、basicAck
basicAck
:表示成功確認(rèn),使用此回執(zhí)方法后,消息會被rabbitmq broker
刪除。
void basicAck(long deliveryTag, boolean multiple)
deliveryTag
:表示消息投遞序號,每次消費消息或者消息重新投遞后,deliveryTag
都會增加。手動消息確認(rèn)模式下,我們可以對指定deliveryTag
的消息進(jìn)行ack
、nack
、reject
等操作。
multiple
:是否批量確認(rèn),值為 true
則會一次性 ack
所有小于當(dāng)前消息 deliveryTag
的消息。
舉個栗子: 假設(shè)我先發(fā)送三條消息deliveryTag
分別是5、6、7,可它們都沒有被確認(rèn),當(dāng)我發(fā)第四條消息此時deliveryTag
為8,multiple
設(shè)置為 true,會將5、6、7、8的消息全部進(jìn)行確認(rèn)。
2、basicNack
basicNack
:表示失敗確認(rèn),一般在消費消息業(yè)務(wù)異常時用到此方法,可以將消息重新投遞入隊列。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
deliveryTag
:表示消息投遞序號。
multiple
:是否批量確認(rèn)。
requeue
:值為 true
消息將重新入隊列。
3、basicReject
basicReject
:拒絕消息,與basicNack
區(qū)別在于不能進(jìn)行批量操作,其他用法很相似。
void basicReject(long deliveryTag, boolean requeue)
deliveryTag
:表示消息投遞序號。
requeue
:值為 true
消息將重新入隊列。
四、測試
發(fā)送消息測試一下消息確認(rèn)機制是否生效,從執(zhí)行結(jié)果上看發(fā)送者發(fā)消息后成功回調(diào),消費端成功的消費了消息。用抓包工具Wireshark
觀察一下rabbitmq
amqp協(xié)議交互的變化,也多了 ack
的過程。
五、踩坑日志
1、別忘確認(rèn)消息
這是一個非常沒技術(shù)含量的坑,但卻是非常容易犯錯的地方。
開啟消息確認(rèn)機制,消費消息別忘了channel.basicAck
,否則消息會一直存在,導(dǎo)致重復(fù)消費。
2、消息無限投遞
在我最開始接觸消息確認(rèn)機制的時候,消費端代碼就像下邊這樣寫的,思路很簡單:處理完業(yè)務(wù)邏輯后確認(rèn)消息, int a = 1 / 0
發(fā)生異常后將消息重新投入隊列。
@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) throws IOException {
try {
log.info("消費者 2 號收到:{}", msg);
int a = 1 / 0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
但是有個問題是,業(yè)務(wù)代碼一旦出現(xiàn) bug
99.9%的情況是不會自動修復(fù),一條消息會被無限投遞進(jìn)隊列,消費端無限執(zhí)行,導(dǎo)致了死循環(huán)。
本地的CPU
被瞬間打滿了,大家可以想象一下當(dāng)時在生產(chǎn)環(huán)境導(dǎo)致服務(wù)死機,我是有多慌。
而且rabbitmq management
只有一條未被確認(rèn)的消息。
經(jīng)過測試分析發(fā)現(xiàn),當(dāng)消息重新投遞到消息隊列時,這條消息不會回到隊列尾部,仍是在隊列頭部。
消費者會立刻消費這條消息,業(yè)務(wù)處理再拋出異常,消息再重新入隊,如此反復(fù)進(jìn)行。導(dǎo)致消息隊列處理出現(xiàn)阻塞,導(dǎo)致正常消息也無法運行。
而我們當(dāng)時的解決方案是,先將消息進(jìn)行應(yīng)答,此時消息隊列會刪除該條消息,同時我們再次發(fā)送該消息到消息隊列,異常消息就放在了消息隊列尾部,這樣既保證消息不會丟失,又保證了正常業(yè)務(wù)的進(jìn)行。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重新發(fā)送消息到隊尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
JSON.toJSONBytes(msg));
但這種方法并沒有解決根本問題,錯誤消息還是會時不時報錯,后面優(yōu)化設(shè)置了消息重試次數(shù),達(dá)到了重試上限以后,手動確認(rèn),隊列刪除此消息,并將消息持久化入MySQL
并推送報警,進(jìn)行人工處理和定時任務(wù)做補償。
3、重復(fù)消費
如何保證 MQ 的消費是冪等性,這個需要根據(jù)具體業(yè)務(wù)而定,可以借助MySQL
、或者redis
將消息持久化,通過再消息中的唯一性屬性校驗。
特別推薦一個分享架構(gòu)+算法的優(yōu)質(zhì)內(nèi)容,還沒關(guān)注的小伙伴,可以長按關(guān)注一下:
長按訂閱更多精彩▼
如有收獲,點個在看,誠摯感謝
免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺僅提供信息存儲服務(wù)。文章僅代表作者個人觀點,不代表本平臺立場,如有問題,請聯(lián)系我們,謝謝!