當(dāng)前位置:首頁 > 公眾號(hào)精選 > 架構(gòu)師社區(qū)
[導(dǎo)讀]我們使用MQ作為消息中間件,傳輸一些消息的時(shí)候,必須考慮到消息丟失的可能。因?yàn)橛械臅r(shí)候消息丟失了,會(huì)產(chǎn)生很嚴(yán)重的后果,比如消息計(jì)費(fèi)數(shù)據(jù),跟錢有關(guān)的消息。這篇文章我們以RocketMQ為例來講解,如何設(shè)計(jì)一套全鏈路消息不丟失的方案。接下來我們分別講下生產(chǎn)者、broker、消費(fèi)者,如...

我們使用MQ作為消息中間件,傳輸一些消息的時(shí)候,必須考慮到消息丟失的可能。因?yàn)橛械臅r(shí)候消息丟失了,會(huì)產(chǎn)生很嚴(yán)重的后果,比如消息計(jì)費(fèi)數(shù)據(jù),跟錢有關(guān)的消息。



這篇文章我們以RocketMQ為例來講解,如何設(shè)計(jì)一套全鏈路消息不丟失的方案。



接下來我們分別講下生產(chǎn)者、broker、消費(fèi)者,如何確保消息不丟失的。



1、生產(chǎn)者如何確保消息不丟失?



發(fā)送消息的時(shí)候,可能存在消息的丟失,就是說可能消息根本就沒有進(jìn)入到MQ就丟了,我們看下面的圖。



圖1 生產(chǎn)者丟失消息



解決生產(chǎn)者丟失消息,一般有兩種方法。



(1)重試發(fā)消



RocketMQ生產(chǎn)者發(fā)送消息一般有三種api:
  • 同步發(fā)送
  • 異步發(fā)送
  • OneWay 發(fā)送


同步發(fā)送,就是生產(chǎn)者向broker發(fā)送消息,阻塞當(dāng)前線程等待broker響應(yīng)發(fā)送結(jié)果。



異步發(fā)送,就是生產(chǎn)者首先創(chuàng)建一個(gè)向broker發(fā)送消息的任務(wù),把該任務(wù)提交給線程池,等執(zhí)行完該任務(wù)時(shí),回調(diào)用戶自定義的回調(diào)函數(shù),執(zhí)行處理結(jié)果。



Oneway發(fā)送,就是生產(chǎn)者只負(fù)責(zé)發(fā)送請求,不等待應(yīng)答,生產(chǎn)者只負(fù)責(zé)把請求發(fā)出去,而不處理響應(yīng)結(jié)果。



為了確保消息一定發(fā)送到了broker,我們可以采用同步發(fā)送的方式,然后等待發(fā)送的結(jié)果。一直等待,如果消息發(fā)送失敗,或者M(jìn)Q內(nèi)部異常,我們肯定會(huì)收到一個(gè)異常,比如請求超時(shí),或者網(wǎng)絡(luò)錯(cuò)誤。



如果我們在收到異常之后,就認(rèn)為消息到MQ發(fā)送失敗了,然后再次重試嘗試發(fā)送消息到MQ,接著再次同步等待MQ返回響應(yīng)給我們,這樣反復(fù)重試,是否可以確保消息一定會(huì)到達(dá)MQ?




理論上一些短暫網(wǎng)絡(luò)異常的場景下,我們是可以通過不停的重試去保證消息到達(dá)MQ的,因?yàn)槿绻虝r(shí)間網(wǎng)絡(luò)異常了消息一直沒法發(fā)送,我們只要不停的重試,網(wǎng)絡(luò)一旦恢復(fù)了,消息就可以發(fā)送到MQ了。



如果要是反復(fù)重試多次還是沒法把消息投遞到MQ,此時(shí)我們就可以直接當(dāng)作消息發(fā)送失敗了。



其代碼就像是這樣的:


try { doSomething(); // 發(fā)送消息到RocketMQ producer.sendMessage();} catch (Exception e) { for (int i = 0; i < 3; i) { // 重試發(fā)消息 producer.sendMessage(); } // 如果重試3次還是發(fā)送失敗,那么此次消息就發(fā)送失敗了。} 另外,如果你是本地先執(zhí)行一些數(shù)據(jù)庫操作,再把消息發(fā)送到RocketMQ,那么就需要注意把本地事務(wù)與發(fā)送消息到RocketMQ放在一個(gè)事務(wù)里,保證執(zhí)行本地事務(wù)和發(fā)送消息要么一起成功,要么一起失敗。


@Transactional(rollbackFor = Exception.class)public void payOrderSuccess() // 執(zhí)行本地事務(wù) try { doSomething(); // 發(fā)送消息到RocketMQ producer.sendMessage(); } catch (Exception e) { for (int i = 0; i < 3; i) { // 重試發(fā)消息 producer.sendMessage(); } // 如果重試3次還是發(fā)送失敗,那么此次消息就發(fā)送失敗了。 throw new Exception(); }} 不過使用這種方式,要考慮到接口耗時(shí)問題,如果網(wǎng)絡(luò)異常,發(fā)送消息到RocketMQ的請求每次都到超時(shí)才返回,那么多次重試可能耗時(shí)很久,導(dǎo)致調(diào)用payOrderSuccess方法的接口超時(shí)異常。




(2)RocketMQ事務(wù)



RocketMQ支持事務(wù)消息機(jī)制,用事務(wù)機(jī)制保證生產(chǎn)者消息發(fā)送成功,這個(gè)方案在業(yè)內(nèi)還是比較常用的。這個(gè)方案落地之后,他可以保證你的本地事務(wù)一旦成功,那么消息必然會(huì)被投遞到MQ中去,業(yè)務(wù)系統(tǒng)的數(shù)據(jù)也是一致的。



MQ事務(wù)機(jī)制原理還是有一點(diǎn)復(fù)雜的,放著這里講,文章篇幅會(huì)過長,所以會(huì)單獨(dú)起一篇文章講解MQ事務(wù)機(jī)制。



不管是重試發(fā)消息的方法,還是事務(wù)機(jī)制,都會(huì)大大影響系統(tǒng)的吞吐量。



2、broker如何確保消息不丟失?



假如現(xiàn)在消息提交到MQ里去了,就一定不會(huì)丟失嗎?



消息進(jìn)入MQ后會(huì)先落到磁盤上,但寫磁盤的過程,并不是一下子就寫到磁盤上的,而是先進(jìn)入os cache,再由操作系統(tǒng)的線程不定時(shí)刷到磁盤上去。



假如此時(shí)這臺(tái)機(jī)器突然宕機(jī)了,os cache里的數(shù)據(jù)就全部丟失了,此時(shí)必然導(dǎo)致你的消息丟失。





那怎么去確保消息寫入MQ之后,MQ自己不要隨便丟失數(shù)據(jù)呢?



解決這個(gè)問題的第一個(gè)關(guān)鍵點(diǎn),就是要知道broker的刷盤策略。broker的刷盤策略有兩種:異步刷盤,同步刷盤。



異步刷盤,就是你的消息即使成功寫入了MQ,它也就在機(jī)器的os cache中,沒有進(jìn)入磁盤里,要過一會(huì)兒等操作系統(tǒng)自己把os cache里的數(shù)據(jù)實(shí)際刷入磁盤文件中去。



所以異步刷盤模式,寫入消息的吞吐量肯定是非常高的,畢竟消息只需要進(jìn)入os cache就可以返回了,但是追求了性能,就降低了可用性,消息就有丟失的風(fēng)險(xiǎn)。



所以如果一定要確保數(shù)據(jù)零丟失的話,可以調(diào)整MQ的刷盤策略為同步刷盤。



RocketMQ broker的默認(rèn)刷盤策略為異步刷盤,即ASYNC_FLUSH??梢詫roker的配置文件中的flushDiskType配置設(shè)置為:SYNC_FLUSH同步刷盤。



同步刷盤之后,我們寫入MQ的每條消息,只要MQ告訴我們寫入成功了,那么就表示已經(jīng)進(jìn)入了磁盤文件了。



同步刷盤,broker就一定不會(huì)丟失數(shù)據(jù)嗎?如果broker磁盤損壞了呢?



接著我們就要講下,如何避免磁盤故障導(dǎo)致數(shù)據(jù)丟失。



其實(shí)也很簡單,我們必須要對Broker使用主從架構(gòu)的模式



也就是說,必須讓一個(gè)Master Broker有一個(gè)Slave Broker去同步它的數(shù)據(jù),而且你一條消息寫入成功,必須是讓slave Broker也寫入成功,保證數(shù)據(jù)有多個(gè)冗余的副本。





這樣一來,你一條消息只要寫入成功了,此時(shí)主從master Broker和slave broker上都有這條數(shù)據(jù)了,此時(shí)如果你的Master Broker的磁盤損壞了,但是Slave Broker上至少還是有數(shù)據(jù)的,數(shù)據(jù)是不會(huì)因?yàn)榇疟P故障而丟失的。



RocketMQ從4.5.0版本開始使用Dledger技術(shù)和基于Raft協(xié)議實(shí)現(xiàn),自動(dòng)故障轉(zhuǎn)移,有興趣的同學(xué)可以自行去查閱相關(guān)資料。



3 如何保證消費(fèi)者消息不丟失?



假如消費(fèi)者拿到了消息,就一定可以成功處理嗎?




如果消費(fèi)者從broker拿到一條信息了,但是消息目前還在它的內(nèi)存里,還沒執(zhí)行具體的業(yè)務(wù)邏輯,此時(shí)他就直接提交了這條消息的offset到broker去說自己已經(jīng)處理過了。



接著消費(fèi)者系統(tǒng)就直接崩潰了,內(nèi)存里的消息就沒了,業(yè)務(wù)邏輯也沒執(zhí)行,結(jié)果Broker已經(jīng)收到他提交的消息offset了,還以為他已經(jīng)處理完這條消息了。



等消費(fèi)者系統(tǒng)重啟的時(shí)候,就不會(huì)再次消費(fèi)這條消息了,因?yàn)橐呀?jīng)提交過offset,broker認(rèn)為你已經(jīng)成功消費(fèi)過這條消息了。



所以我們在這里,我們要明確一點(diǎn),即使你保證發(fā)送消息到MQ的時(shí)候絕對不會(huì)丟失,而且MQ收到消息之后一定不會(huì)把消息搞丟失,但是你的消費(fèi)者系統(tǒng)在獲取到消息之后還是可能會(huì)搞丟。



一般RocketMQ的消費(fèi)者中會(huì)注冊一個(gè)監(jiān)聽器,當(dāng)你的消費(fèi)者獲取到一批消息之后,就會(huì)回調(diào)你的這個(gè)監(jiān)聽器函數(shù),讓你來處理這一批消息。


consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List ConsumeConcurrentlyContext context) { // 執(zhí)行業(yè)務(wù)邏輯 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); 處理完畢后,才會(huì)返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS作為消費(fèi)成功的標(biāo)志,告訴RocketMQ,這批消息我已經(jīng)處理完畢了。



所以對于RocketMQ而言,只要你的消費(fèi)者系統(tǒng)是在這個(gè)監(jiān)聽器的函數(shù)中先處理一批消息,基于這批消息都執(zhí)行完了業(yè)務(wù)邏輯,然后返回了那個(gè)消費(fèi)成功的狀態(tài),接著才會(huì)去提交這批消息的offset到broker去。



所以在這個(gè)情況下,如果你對一批消息都處理完畢了,然后再提交消息的offset給broker,接著消費(fèi)者系統(tǒng)崩潰了,此時(shí)是不會(huì)丟失消息的。



但是,如果是消費(fèi)者系統(tǒng)獲取到一批消息之后,還沒處理完,也就是還沒返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS這個(gè)狀態(tài),自然沒提交這批消息的offset給broker呢,此時(shí)消費(fèi)者系統(tǒng)突然掛了,會(huì)怎么樣?



在這種情況下,你對一批消息都沒提交他的offset給broker,broker不會(huì)認(rèn)為你已經(jīng)處理完了這批消息,此時(shí)你的消費(fèi)者系統(tǒng)的一臺(tái)機(jī)器宕機(jī)了,它其實(shí)會(huì)感知到你的消費(fèi)者系統(tǒng)的一臺(tái)機(jī)器作為一個(gè)Consumer掛了,它會(huì)把你沒處理完的那批消息交給生產(chǎn)者系統(tǒng)的其他機(jī)器去進(jìn)行處理,所以在這種情況下,消息也絕對是不會(huì)丟失的。



在默認(rèn)的Consumer的消費(fèi)模式之下,必須是你處理完一批消息了,才會(huì)返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS這個(gè)狀態(tài),表示消息都處理結(jié)束了,去提交offset到broker去。在這種情況下,一般來說是不會(huì)丟失消息的,即使你一個(gè)Consumer宕機(jī)了,他會(huì)把你沒處理完的消息交給其他Consumer去處理。



但是這里我們要注意一點(diǎn),就是我們不能在代碼中對消息進(jìn)行異步的處理,假如我們開啟了一個(gè)線程去處理這批消息,然后啟動(dòng)線程之后,就直接返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS狀態(tài)了。


consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List ConsumeConcurrentlyContext context) { new Thread() { public void run() { // 執(zhí)行業(yè)務(wù)邏輯 } }.start(); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); 如果要是用這種方式來處理消息的話,那可能就會(huì)出現(xiàn)你開啟的線程還沒處理完消息呢,已經(jīng)返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS狀態(tài)了,就可能提交這批消息的offset給broker了,認(rèn)為已經(jīng)處理結(jié)束了。



然后此時(shí)你消費(fèi)者系統(tǒng)突然宕機(jī),必然會(huì)導(dǎo)致你的消息丟失了!



因此在使用RocketMQ的場景下,我們?nèi)绻WC消費(fèi)數(shù)據(jù)的時(shí)候別丟消息,你就老老實(shí)實(shí)的在回調(diào)函數(shù)里處理消息,處理完了你再返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS狀態(tài)表明你處理完畢了。



總結(jié):



基于RocketMQ設(shè)計(jì)一套全鏈路消息不丟失方案,需要確保生產(chǎn)者、broker、消費(fèi)者三者都不丟失數(shù)據(jù)。



(1)生產(chǎn)者不丟失消息



方案1:同步發(fā)送消息  失敗重試;
方案2:事務(wù)消息機(jī)制;
(2)broker不丟失消息,開啟同步刷盤策略 主從架構(gòu)同步機(jī)制。



只要讓一個(gè)master Broker收到消息之后同步寫入磁盤,同時(shí)同步復(fù)制給其他slave Broker,再返回成功響應(yīng)給生產(chǎn)者,此時(shí)就可以保證MQ自己不會(huì)弄丟消息



(3)消費(fèi)者不丟失消息,采用RocketMQ的消費(fèi)者天然就可以保證你處理完消息之后,才會(huì)提交消息的offset到broker去,不過別采用多線程異步處理消息的方式。



雖然這一整套消息不丟失方案,可以確保消息流轉(zhuǎn)過程中不丟失。但顯而易見的是,你用了這套方案之后,會(huì)讓你整個(gè)從頭到尾的消息流轉(zhuǎn)鏈路的性能大幅度下降,讓你的MQ的吞吐量大幅度的下降。



所以一般大家不要隨便一個(gè)業(yè)務(wù)里就上如此重的一套方案,要明白這背后的成本!



一般我們建議,對于跟金錢、交易以及核心數(shù)據(jù)相關(guān)的系統(tǒng)和核心鏈路,可以上這套消息零丟失方案。



而對于其他大部分沒那么核心的場景和系統(tǒng),其實(shí)即使丟失一些數(shù)據(jù),也不會(huì)導(dǎo)致太大的問題,此時(shí)可以不采取這些方案,或者說你可以在其他的場景里做一些簡化。


ckname="架構(gòu)師社區(qū)" data-alias="devabc" data-signature="架構(gòu)師社區(qū),專注分享架構(gòu)師技術(shù)干貨,架構(gòu)師行業(yè)秘聞,匯集各類奇妙好玩的架構(gòu)師話題和流行的架構(gòu)師動(dòng)向!" data-from="0">


本站聲明: 本文章由作者或相關(guān)機(jī)構(gòu)授權(quán)發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點(diǎn),本站亦不保證或承諾內(nèi)容真實(shí)性等。需要轉(zhuǎn)載請聯(lián)系該專欄作者,如若文章內(nèi)容侵犯您的權(quán)益,請及時(shí)聯(lián)系本站刪除。
換一批
延伸閱讀

廣告科技領(lǐng)導(dǎo)者Kira LeBlanc晉升為全球首席營銷官  蒙特利爾和多倫多2022年4月1日 /美通社/ -- 全球最大的獨(dú)立程序化數(shù)字戶外(DOOH)廣告技術(shù)公司之一Hivestack今天宣布...

關(guān)鍵字: ck

(全球TMT2022年4月1日訊)獨(dú)立程序化數(shù)字戶外(DOOH)廣告技術(shù)公司Hivestack宣布任命Kira LeBlanc為全球首席營銷官。LeBlanc于2021年初Hivestack宣布其全球擴(kuò)張計(jì)劃時(shí)加入該公...

關(guān)鍵字: ck

2021年全年多項(xiàng)業(yè)績指標(biāo)再創(chuàng)新高; “企業(yè)數(shù)字化運(yùn)營解決方案”全年收入持續(xù)三位數(shù)同比增長; “SaaS+X”商業(yè)模式為“企業(yè)數(shù)字化運(yùn)營解決方案”的迅猛增長...

關(guān)鍵字: ic ck

(全球TMT2022年3月24日訊)Shutterstock, Inc.是一個(gè)全球領(lǐng)先的創(chuàng)意平臺(tái),為眾多品牌、企業(yè)和媒體公司提供全方位服務(wù)解決方案、高質(zhì)量內(nèi)容及創(chuàng)意工作流程解決方案。該公司宣布在其已有十年傳統(tǒng)的年度奧斯...

關(guān)鍵字: ck

在其推出年度“奧斯卡流行藝術(shù)!”活動(dòng)系列10周年之際,Shutterstock內(nèi)部創(chuàng)意團(tuán)隊(duì)立足其平臺(tái)逾4億創(chuàng)意資產(chǎn),創(chuàng)作原創(chuàng)波普藝術(shù)風(fēng)格作品...

關(guān)鍵字: ck

倫敦2022年3月15日 /美通社/ -- Warwick Investment Group在貝爾格拉維亞的伊布里大道(Ebury Street)收購了五處相毗鄰的永久產(chǎn)權(quán)房產(chǎn),共25套公寓,由此完成了該公司迄今為止規(guī)模...

關(guān)鍵字: ic ck

Hivestack 任命前三星廣告 AdTech 資深人士 Mina Naguib 擔(dān)任首席技術(shù)官 加拿大蒙特利爾2022年2月7 日 /美通社/ -- Hivestack——全球領(lǐng)先的獨(dú)...

關(guān)鍵字: ck

(全球TMT2022年2月7日訊)獨(dú)立程序化數(shù)字戶外?(DOOH) 廣告技術(shù)公司Hivestack,宣布聘請前三星廣告技術(shù)資深人士?Mina Naguib 擔(dān)任首席技術(shù)官。Naguib 將直接向首席執(zhí)行官?Andrea...

關(guān)鍵字: 三星 ck

新加坡、菲律賓馬尼拉和曼谷2022年1月26日 /美通社/ -- 東南亞技術(shù)驅(qū)動(dòng)型物流平臺(tái) Inteluck 宣布,今天已獲得 1500 萬美元的&n...

關(guān)鍵字: ck

- 這家全球支付處理商的估值達(dá)到400億美元,迄今已累計(jì)籌得18億美元 - 主要投資者包括奧特米特(Altimeter)、德龍集團(tuán)(Dragoneer)、富蘭克林鄧普頓(F...

關(guān)鍵字: ck

架構(gòu)師社區(qū)

1739 篇文章

關(guān)注

發(fā)布文章

編輯精選

技術(shù)子站

關(guān)閉