我們使用MQ作為消息中間件,傳輸一些消息的時(shí)候,必須考慮到消息丟失的可能。因?yàn)橛械臅r(shí)候消息丟失了,會(huì)產(chǎn)生很嚴(yán)重的后果,比如消息計(jì)費(fèi)數(shù)據(jù),跟錢有關(guān)的消息。
這篇文章我們以Ro
cketMQ為例來講解,如何設(shè)計(jì)一套全鏈路消息不丟失的方案。
接下來我們分別講下生產(chǎn)者、broker、消費(fèi)者,如何確保消息不丟失的。
1、生產(chǎn)者如何確保消息不丟失?
發(fā)送消息的時(shí)候,可能存在消息的丟失,就是說可能消息根本就沒有進(jìn)入到MQ就丟了,我們看下面的圖。
圖1 生產(chǎn)者丟失消息
解決生產(chǎn)者丟失消息,一般有兩種方法。
(1)重試發(fā)消息
RocketMQ生產(chǎn)者發(fā)送消息一般有三種api:
-
同步發(fā)送
-
異步發(fā)送
-
OneWay 發(fā)送
同步發(fā)送,就是生產(chǎn)者向broker發(fā)送消息,阻塞當(dāng)前線程等待broker響應(yīng)發(fā)送結(jié)果。
異步發(fā)送,就是生產(chǎn)者首先創(chuàng)建一個(gè)向broker發(fā)送消息的任務(wù),把該任務(wù)提交給線程池,等執(zhí)行完該任務(wù)時(shí),回調(diào)用戶自定義的回調(diào)函數(shù),執(zhí)行處理結(jié)果。
Oneway發(fā)送,就是生產(chǎn)者只負(fù)責(zé)發(fā)送請求,不等待應(yīng)答,生產(chǎn)者只負(fù)責(zé)把請求發(fā)出去,而不處理響應(yīng)結(jié)果。
為了確保消息一定發(fā)送到了broker,我們可以采用同步發(fā)送的方式,然后等待發(fā)送的結(jié)果。一直等待,如果消息發(fā)送失敗,或者M(jìn)Q內(nèi)部異常,我們肯定會(huì)收到一個(gè)異常,比如請求超時(shí),或者網(wǎng)絡(luò)錯(cuò)誤。
如果我們在收到異常之后,就認(rèn)為消息到MQ發(fā)送失敗了,然后再次重試嘗試發(fā)送消息到MQ,接著再次同步等待MQ返回響應(yīng)給我們,這樣反復(fù)重試,是否可以確保消息一定會(huì)到達(dá)MQ?
理論上一些短暫網(wǎng)絡(luò)異常的場景下,我們是可以通過不停的重試去保證消息到達(dá)MQ的,因?yàn)槿绻虝r(shí)間網(wǎng)絡(luò)異常了消息一直沒法發(fā)送,我們只要不停的重試,網(wǎng)絡(luò)一旦恢復(fù)了,消息就可以發(fā)送到MQ了。
如果要是反復(fù)重試多次還是沒法把消息投遞到MQ,此時(shí)我們就可以直接當(dāng)作消息發(fā)送失敗了。
其代碼就像是這樣的:
try { doSomething(); // 發(fā)送消息到RocketMQ producer.sendMessage();} catch (Exception e) { for (int i = 0; i < 3; i) { // 重試發(fā)消息 producer.sendMessage(); } // 如果重試3次還是發(fā)送失敗,那么此次消息就發(fā)送失敗了。} 另外,如果你是本地先執(zhí)行一些數(shù)據(jù)庫操作,再把消息發(fā)送到RocketMQ,那么就需要注意把本地事務(wù)與發(fā)送消息到RocketMQ放在一個(gè)事務(wù)里,保證執(zhí)行本地事務(wù)和發(fā)送消息要么一起成功,要么一起失敗。
@Transactional(rollbackFor = Exception.class)public void payOrderSuccess() // 執(zhí)行本地事務(wù) try { doSomething(); // 發(fā)送消息到RocketMQ producer.sendMessage(); } catch (Exception e) { for (int i = 0; i < 3; i) { // 重試發(fā)消息 producer.sendMessage(); } // 如果重試3次還是發(fā)送失敗,那么此次消息就發(fā)送失敗了。 throw new Exception(); }} 不過使用這種方式,要考慮到接口耗時(shí)問題,如果網(wǎng)絡(luò)異常,發(fā)送消息到RocketMQ的請求每次都到超時(shí)才返回,那么多次重試可能耗時(shí)很久,導(dǎo)致調(diào)用payOrderSuccess方法的接口超時(shí)異常。
(2)RocketMQ事務(wù)
RocketMQ支持事務(wù)消息機(jī)制,用事務(wù)機(jī)制保證生產(chǎn)者消息發(fā)送成功,這個(gè)方案在業(yè)內(nèi)還是比較常用的。這個(gè)方案落地之后,他可以保證你的本地事務(wù)一旦成功,那么消息必然會(huì)被投遞到MQ中去,業(yè)務(wù)系統(tǒng)的數(shù)據(jù)也是一致的。
MQ事務(wù)機(jī)制原理還是有一點(diǎn)復(fù)雜的,放著這里講,文章篇幅會(huì)過長,所以會(huì)單獨(dú)起一篇文章講解MQ事務(wù)機(jī)制。
不管是重試發(fā)消息的方法,還是事務(wù)機(jī)制,都會(huì)大大影響系統(tǒng)的吞吐量。
2、broker如何確保消息不丟失?
假如現(xiàn)在消息提交到MQ里去了,就一定不會(huì)丟失嗎?
消息進(jìn)入MQ后會(huì)先落到磁盤上,但寫磁盤的過程,并不是一下子就寫到磁盤上的,而是先進(jìn)入os cache,再由操作系統(tǒng)的線程不定時(shí)刷到磁盤上去。
假如此時(shí)這臺(tái)機(jī)器突然宕機(jī)了,os cache里的數(shù)據(jù)就全部丟失了,此時(shí)必然導(dǎo)致你的消息丟失。
那怎么去確保消息寫入MQ之后,MQ自己不要隨便丟失數(shù)據(jù)呢?
解決這個(gè)問題的第一個(gè)關(guān)鍵點(diǎn),就是要知道broker的刷盤策略。broker的刷盤策略有兩種:異步刷盤,同步刷盤。
異步刷盤,就是你的消息即使成功寫入了MQ,它也就在機(jī)器的os cache中,沒有進(jìn)入磁盤里,要過一會(huì)兒等操作系統(tǒng)自己把os cache里的數(shù)據(jù)實(shí)際刷入磁盤文件中去。
所以異步刷盤模式,寫入消息的吞吐量肯定是非常高的,畢竟消息只需要進(jìn)入os cache就可以返回了,但是追求了性能,就降低了可用性,消息就有丟失的風(fēng)險(xiǎn)。
所以如果一定要確保數(shù)據(jù)零丟失的話,可以調(diào)整MQ的刷盤策略為同步刷盤。
RocketMQ broker的默認(rèn)刷盤策略為異步刷盤,即ASYNC_FLUSH??梢詫roker的配置文件中的flushDiskType配置設(shè)置為:SYNC_FLUSH同步刷盤。
同步刷盤之后,我們寫入MQ的每條消息,只要MQ告訴我們寫入成功了,那么就表示已經(jīng)進(jìn)入了磁盤文件了。
同步刷盤,broker就一定不會(huì)丟失數(shù)據(jù)嗎?如果broker磁盤損壞了呢?
接著我們就要講下,如何避免磁盤故障導(dǎo)致數(shù)據(jù)丟失。
其實(shí)也很簡單,我們必須要對Broker使用主從架構(gòu)的模式
也就是說,必須讓一個(gè)Master Broker有一個(gè)Slave Broker去同步它的數(shù)據(jù),而且你一條消息寫入成功,必須是讓slave Broker也寫入成功,保證數(shù)據(jù)有多個(gè)冗余的副本。
這樣一來,你一條消息只要寫入成功了,此時(shí)主從master Broker和slave broker上都有這條數(shù)據(jù)了,此時(shí)如果你的Master Broker的磁盤損壞了,但是Slave Broker上至少還是有數(shù)據(jù)的,數(shù)據(jù)是不會(huì)因?yàn)榇疟P故障而丟失的。
RocketMQ從4.5.0版本開始使用Dledger技術(shù)和基于Raft協(xié)議實(shí)現(xiàn),自動(dòng)故障轉(zhuǎn)移,有興趣的同學(xué)可以自行去查閱相關(guān)資料。
3 如何保證消費(fèi)者消息不丟失?
假如消費(fèi)者拿到了消息,就一定可以成功處理嗎?
如果消費(fèi)者從broker拿到一條信息了,但是消息目前還在它的內(nèi)存里,還沒執(zhí)行具體的業(yè)務(wù)邏輯,此時(shí)他就直接提交了這條消息的offset到broker去說自己已經(jīng)處理過了。
接著消費(fèi)者系統(tǒng)就直接崩潰了,內(nèi)存里的消息就沒了,業(yè)務(wù)邏輯也沒執(zhí)行,結(jié)果Broker已經(jīng)收到他提交的消息offset了,還以為他已經(jīng)處理完這條消息了。
等消費(fèi)者系統(tǒng)重啟的時(shí)候,就不會(huì)再次消費(fèi)這條消息了,因?yàn)橐呀?jīng)提交過offset,broker認(rèn)為你已經(jīng)成功消費(fèi)過這條消息了。
所以我們在這里,我們要明確一點(diǎn),即使你保證發(fā)送消息到MQ的時(shí)候絕對不會(huì)丟失,而且MQ收到消息之后一定不會(huì)把消息搞丟失,但是你的消費(fèi)者系統(tǒng)在獲取到消息之后還是可能會(huì)搞丟。
一般RocketMQ的消費(fèi)者中會(huì)注冊一個(gè)監(jiān)聽器,當(dāng)你的消費(fèi)者獲取到一批消息之后,就會(huì)回調(diào)你的這個(gè)監(jiān)聽器函數(shù),讓你來處理這一批消息。
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List ConsumeConcurrentlyContext context) { // 執(zhí)行業(yè)務(wù)邏輯 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); 處理完畢后,才會(huì)返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS作為消費(fèi)成功的標(biāo)志,告訴RocketMQ,這批消息我已經(jīng)處理完畢了。
所以對于RocketMQ而言,只要你的消費(fèi)者系統(tǒng)是在這個(gè)監(jiān)聽器的函數(shù)中先處理一批消息,基于這批消息都執(zhí)行完了業(yè)務(wù)邏輯,然后返回了那個(gè)消費(fèi)成功的狀態(tài),接著才會(huì)去提交這批消息的offset到broker去。
所以在這個(gè)情況下,如果你對一批消息都處理完畢了,然后再提交消息的offset給broker,接著消費(fèi)者系統(tǒng)崩潰了,此時(shí)是不會(huì)丟失消息的。
但是,如果是消費(fèi)者系統(tǒng)獲取到一批消息之后,還沒處理完,也就是還沒返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS這個(gè)狀態(tài),自然沒提交這批消息的offset給broker呢,此時(shí)消費(fèi)者系統(tǒng)突然掛了,會(huì)怎么樣?
在這種情況下,你對一批消息都沒提交他的offset給broker,broker不會(huì)認(rèn)為你已經(jīng)處理完了這批消息,此時(shí)你的消費(fèi)者系統(tǒng)的一臺(tái)機(jī)器宕機(jī)了,它其實(shí)會(huì)感知到你的消費(fèi)者系統(tǒng)的一臺(tái)機(jī)器作為一個(gè)Consumer掛了,它會(huì)把你沒處理完的那批消息交給生產(chǎn)者系統(tǒng)的其他機(jī)器去進(jìn)行處理,所以在這種情況下,消息也絕對是不會(huì)丟失的。
在默認(rèn)的Consumer的消費(fèi)模式之下,必須是你處理完一批消息了,才會(huì)返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS這個(gè)狀態(tài),表示消息都處理結(jié)束了,去提交offset到broker去。在這種情況下,一般來說是不會(huì)丟失消息的,即使你一個(gè)Consumer宕機(jī)了,他會(huì)把你沒處理完的消息交給其他Consumer去處理。
但是這里我們要注意一點(diǎn),就是我們不能在代碼中對消息進(jìn)行異步的處理,假如我們開啟了一個(gè)線程去處理這批消息,然后啟動(dòng)線程之后,就直接返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS狀態(tài)了。
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List ConsumeConcurrentlyContext context) { new Thread() { public void run() { // 執(zhí)行業(yè)務(wù)邏輯 } }.start(); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); 如果要是用這種方式來處理消息的話,那可能就會(huì)出現(xiàn)你開啟的線程還沒處理完消息呢,已經(jīng)返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS狀態(tài)了,就可能提交這批消息的offset給broker了,認(rèn)為已經(jīng)處理結(jié)束了。
然后此時(shí)你消費(fèi)者系統(tǒng)突然宕機(jī),必然會(huì)導(dǎo)致你的消息丟失了!
因此在使用RocketMQ的場景下,我們?nèi)绻WC消費(fèi)數(shù)據(jù)的時(shí)候別丟消息,你就老老實(shí)實(shí)的在回調(diào)函數(shù)里處理消息,處理完了你再返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS狀態(tài)表明你處理完畢了。
總結(jié):
基于RocketMQ設(shè)計(jì)一套全鏈路消息不丟失方案,需要確保生產(chǎn)者、broker、消費(fèi)者三者都不丟失數(shù)據(jù)。
(1)生產(chǎn)者不丟失消息
方案1:同步發(fā)送消息 失敗重試;
方案2:事務(wù)消息機(jī)制;
(2)broker不丟失消息,開啟同步刷盤策略 主從架構(gòu)同步機(jī)制。
只要讓一個(gè)master Broker收到消息之后同步寫入磁盤,同時(shí)同步復(fù)制給其他slave Broker,再返回成功響應(yīng)給生產(chǎn)者,此時(shí)就可以保證MQ自己不會(huì)弄丟消息
(3)消費(fèi)者不丟失消息,采用RocketMQ的消費(fèi)者天然就可以保證你處理完消息之后,才會(huì)提交消息的offset到broker去,不過別采用多線程異步處理消息的方式。
雖然這一整套消息不丟失方案,可以確保消息流轉(zhuǎn)過程中不丟失。但顯而易見的是,你用了這套方案之后,會(huì)讓你整個(gè)從頭到尾的消息流轉(zhuǎn)鏈路的性能大幅度下降,讓你的MQ的吞吐量大幅度的下降。
所以一般大家不要隨便一個(gè)業(yè)務(wù)里就上如此重的一套方案,要明白這背后的成本!
一般我們建議,對于跟金錢、交易以及核心數(shù)據(jù)相關(guān)的系統(tǒng)和核心鏈路,可以上這套消息零丟失方案。
而對于其他大部分沒那么核心的場景和系統(tǒng),其實(shí)即使丟失一些數(shù)據(jù),也不會(huì)導(dǎo)致太大的問題,此時(shí)可以不采取這些方案,或者說你可以在其他的場景里做一些簡化。
ckname="架構(gòu)師社區(qū)" data-alias="devabc" data-signature="架構(gòu)師社區(qū),專注分享架構(gòu)師技術(shù)干貨,架構(gòu)師行業(yè)秘聞,匯集各類奇妙好玩的架構(gòu)師話題和流行的架構(gòu)師動(dòng)向!" data-from="0">
本站聲明: 本文章由作者或相關(guān)機(jī)構(gòu)授權(quán)發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點(diǎn),本站亦不保證或承諾內(nèi)容真實(shí)性等。需要轉(zhuǎn)載請聯(lián)系該專欄作者,如若文章內(nèi)容侵犯您的權(quán)益,請及時(shí)聯(lián)系本站刪除。