《我想進(jìn)大廠》之MQ奪命連環(huán)11問
繼之前的mysql奪命連環(huán)之后,我發(fā)現(xiàn)我這個(gè)標(biāo)題被好多套用的,什么奪命zookeeper,奪命多線程一大堆,這一次,開始面試題系列MQ專題,消息隊(duì)列作為日常常見的使用中間件,面試也是必問的點(diǎn)之一,一起來看看MQ的面試題。
你們?yōu)槭裁词褂胢q?具體的使用場(chǎng)景是什么?
mq的作用很簡(jiǎn)單,削峰填谷。以電商交易下單的場(chǎng)景來說,正向交易的過程可能涉及到創(chuàng)建訂單、扣減庫存、扣減活動(dòng)預(yù)算、扣減積分等等。每個(gè)接口的耗時(shí)如果是100ms,那么理論上整個(gè)下單的鏈路就需要耗費(fèi)400ms,這個(gè)時(shí)間顯然是太長(zhǎng)了。
如果這些操作全部同步處理的話,首先調(diào)用鏈路太長(zhǎng)影響接口性能,其次分布式事務(wù)的問題很難處理,這時(shí)候像扣減預(yù)算和積分這種對(duì)實(shí)時(shí)一致性要求沒有那么高的請(qǐng)求,完全就可以通過mq異步的方式去處理了。同時(shí),考慮到異步帶來的不一致的問題,我們可以通過job去重試保證接口調(diào)用成功,而且一般公司都會(huì)有核對(duì)的平臺(tái),比如下單成功但是未扣減積分的這種問題可以通過核對(duì)作為兜底的處理方案。
使用mq之后我們的鏈路變簡(jiǎn)單了,同時(shí)異步發(fā)送消息我們的整個(gè)系統(tǒng)的抗壓能力也上升了。
那你們使用什么mq?基于什么做的選型?
我們主要調(diào)研了幾個(gè)主流的mq,kafka、rabbitmq、rocketmq、activemq,選型我們主要基于以下幾個(gè)點(diǎn)去考慮:
-
由于我們系統(tǒng)的qps壓力比較大,所以性能是首要考慮的要素。 -
開發(fā)語言,由于我們的開發(fā)語言是java,主要是為了方便二次開發(fā)。 -
對(duì)于高并發(fā)的業(yè)務(wù)場(chǎng)景是必須的,所以需要支持分布式架構(gòu)的設(shè)計(jì)。 -
功能全面,由于不同的業(yè)務(wù)場(chǎng)景,可能會(huì)用到順序消息、事務(wù)消息等。
基于以上幾個(gè)考慮,我們最終選擇了RocketMQ。
Kafka | RocketMQ | RabbitMQ | ActiveMQ | |
---|---|---|---|---|
單機(jī)吞吐量 | 10萬級(jí) | 10萬級(jí) | 萬級(jí) | 萬級(jí) |
開發(fā)語言 | Scala | Java | Erlang | Java |
高可用 | 分布式架構(gòu) | 分布式架構(gòu) | 主從架構(gòu) | 主從架構(gòu) |
性能 | ms級(jí) | ms級(jí) | us級(jí) | ms級(jí) |
功能 | 只支持主要的MQ功能 | 順序消息、事務(wù)消息等功能完善 | 并發(fā)強(qiáng)、性能好、延時(shí)低 | 成熟的社區(qū)產(chǎn)品、文檔豐富 |
你上面提到異步發(fā)送,那消息可靠性怎么保證?
消息丟失可能發(fā)生在生產(chǎn)者發(fā)送消息、MQ本身丟失消息、消費(fèi)者丟失消息3個(gè)方面。
生產(chǎn)者丟失
生產(chǎn)者丟失消息的可能點(diǎn)在于程序發(fā)送失敗拋異常了沒有重試處理,或者發(fā)送的過程成功但是過程中網(wǎng)絡(luò)閃斷MQ沒收到,消息就丟失了。
由于同步發(fā)送的一般不會(huì)出現(xiàn)這樣使用方式,所以我們就不考慮同步發(fā)送的問題,我們基于異步發(fā)送的場(chǎng)景來說。
異步發(fā)送分為兩個(gè)方式:異步有回調(diào)和異步無回調(diào),無回調(diào)的方式,生產(chǎn)者發(fā)送完后不管結(jié)果可能就會(huì)造成消息丟失,而通過異步發(fā)送+回調(diào)通知+本地消息表的形式我們就可以做出一個(gè)解決方案。以下單的場(chǎng)景舉例。
-
下單后先保存本地?cái)?shù)據(jù)和MQ消息表,這時(shí)候消息的狀態(tài)是發(fā)送中,如果本地事務(wù)失敗,那么下單失敗,事務(wù)回滾。 -
下單成功,直接返回客戶端成功,異步發(fā)送MQ消息 -
MQ回調(diào)通知消息發(fā)送結(jié)果,對(duì)應(yīng)更新數(shù)據(jù)庫MQ發(fā)送狀態(tài) -
JOB輪詢超過一定時(shí)間(時(shí)間根據(jù)業(yè)務(wù)配置)還未發(fā)送成功的消息去重試 -
在監(jiān)控平臺(tái)配置或者JOB程序處理超過一定次數(shù)一直發(fā)送不成功的消息,告警,人工介入。
一般而言,對(duì)于大部分場(chǎng)景來說異步回調(diào)的形式就可以了,只有那種需要完全保證不能丟失消息的場(chǎng)景我們做一套完整的解決方案。
MQ丟失
如果生產(chǎn)者保證消息發(fā)送到MQ,而MQ收到消息后還在內(nèi)存中,這時(shí)候宕機(jī)了又沒來得及同步給從節(jié)點(diǎn),就有可能導(dǎo)致消息丟失。
比如RocketMQ:
RocketMQ分為同步刷盤和異步刷盤兩種方式,默認(rèn)的是異步刷盤,就有可能導(dǎo)致消息還未刷到硬盤上就丟失了,可以通過設(shè)置為同步刷盤的方式來保證消息可靠性,這樣即使MQ掛了,恢復(fù)的時(shí)候也可以從磁盤中去恢復(fù)消息。
比如Kafka也可以通過配置做到:
acks=all 只有參與復(fù)制的所有節(jié)點(diǎn)全部收到消息,才返回生產(chǎn)者成功。這樣的話除非所有的節(jié)點(diǎn)都掛了,消息才會(huì)丟失。
replication.factor=N,設(shè)置大于1的數(shù),這會(huì)要求每個(gè)partion至少有2個(gè)副本
min.insync.replicas=N,設(shè)置大于1的數(shù),這會(huì)要求leader至少感知到一個(gè)follower還保持著連接
retries=N,設(shè)置一個(gè)非常大的值,讓生產(chǎn)者發(fā)送失敗一直重試
雖然我們可以通過配置的方式來達(dá)到MQ本身高可用的目的,但是都對(duì)性能有損耗,怎樣配置需要根據(jù)業(yè)務(wù)做出權(quán)衡。
消費(fèi)者丟失
消費(fèi)者丟失消息的場(chǎng)景:消費(fèi)者剛收到消息,此時(shí)服務(wù)器宕機(jī),MQ認(rèn)為消費(fèi)者已經(jīng)消費(fèi),不會(huì)重復(fù)發(fā)送消息,消息丟失。
RocketMQ默認(rèn)是需要消費(fèi)者回復(fù)ack確認(rèn),而kafka需要手動(dòng)開啟配置關(guān)閉自動(dòng)offset。
消費(fèi)方不返回ack確認(rèn),重發(fā)的機(jī)制根據(jù)MQ類型的不同發(fā)送時(shí)間間隔、次數(shù)都不盡相同,如果重試超過次數(shù)之后會(huì)進(jìn)入死信隊(duì)列,需要手工來處理了。(Kafka沒有這些)
你說到消費(fèi)者消費(fèi)失敗的問題,那么如果一直消費(fèi)失敗導(dǎo)致消息積壓怎么處理?
因?yàn)榭紤]到時(shí)消費(fèi)者消費(fèi)一直出錯(cuò)的問題,那么我們可以從以下幾個(gè)角度來考慮:
-
消費(fèi)者出錯(cuò),肯定是程序或者其他問題導(dǎo)致的,如果容易修復(fù),先把問題修復(fù),讓consumer恢復(fù)正常消費(fèi) -
如果時(shí)間來不及處理很麻煩,做轉(zhuǎn)發(fā)處理,寫一個(gè)臨時(shí)的consumer消費(fèi)方案,先把消息消費(fèi),然后再轉(zhuǎn)發(fā)到一個(gè)新的topic和MQ資源,這個(gè)新的topic的機(jī)器資源單獨(dú)申請(qǐng),要能承載住當(dāng)前積壓的消息 -
處理完積壓數(shù)據(jù)后,修復(fù)consumer,去消費(fèi)新的MQ和現(xiàn)有的MQ數(shù)據(jù),新MQ消費(fèi)完成后恢復(fù)原狀
那如果消息積壓達(dá)到磁盤上限,消息被刪除了怎么辦?
這。。。他媽都刪除了我有啥辦法啊。。。冷靜,再想想。。有了。
最初,我們發(fā)送的消息記錄是落庫保存了的,而轉(zhuǎn)發(fā)發(fā)送的數(shù)據(jù)也保存了,那么我們就可以通過這部分?jǐn)?shù)據(jù)來找到丟失的那部分?jǐn)?shù)據(jù),再單獨(dú)跑個(gè)腳本重發(fā)就可以了。如果轉(zhuǎn)發(fā)的程序沒有落庫,那就和消費(fèi)方的記錄去做對(duì)比,只是過程會(huì)更艱難一點(diǎn)。
說了這么多,那你說說RocketMQ實(shí)現(xiàn)原理吧?
RocketMQ由NameServer注冊(cè)中心集群、Producer生產(chǎn)者集群、Consumer消費(fèi)者集群和若干Broker(RocketMQ進(jìn)程)組成,它的架構(gòu)原理是這樣的:
-
Broker在啟動(dòng)的時(shí)候去向所有的NameServer注冊(cè),并保持長(zhǎng)連接,每30s發(fā)送一次心跳 -
Producer在發(fā)送消息的時(shí)候從NameServer獲取Broker服務(wù)器地址,根據(jù)負(fù)載均衡算法選擇一臺(tái)服務(wù)器來發(fā)送消息 -
Conusmer消費(fèi)消息的時(shí)候同樣從NameServer獲取Broker地址,然后主動(dòng)拉取消息來消費(fèi)
為什么RocketMQ不使用Zookeeper作為注冊(cè)中心呢?
我認(rèn)為有以下幾個(gè)點(diǎn)是不使用zookeeper的原因:
-
根據(jù)CAP理論,同時(shí)最多只能滿足兩個(gè)點(diǎn),而zookeeper滿足的是CP,也就是說zookeeper并不能保證服務(wù)的可用性,zookeeper在進(jìn)行選舉的時(shí)候,整個(gè)選舉的時(shí)間太長(zhǎng),期間整個(gè)集群都處于不可用的狀態(tài),而這對(duì)于一個(gè)注冊(cè)中心來說肯定是不能接受的,作為服務(wù)發(fā)現(xiàn)來說就應(yīng)該是為可用性而設(shè)計(jì)。 -
基于性能的考慮,NameServer本身的實(shí)現(xiàn)非常輕量,而且可以通過增加機(jī)器的方式水平擴(kuò)展,增加集群的抗壓能力,而zookeeper的寫是不可擴(kuò)展的,而zookeeper要解決這個(gè)問題只能通過劃分領(lǐng)域,劃分多個(gè)zookeeper集群來解決,首先操作起來太復(fù)雜,其次這樣還是又違反了CAP中的A的設(shè)計(jì),導(dǎo)致服務(wù)之間是不連通的。 -
持久化的機(jī)制來帶的問題,ZooKeeper 的 ZAB 協(xié)議對(duì)每一個(gè)寫請(qǐng)求,會(huì)在每個(gè) ZooKeeper 節(jié)點(diǎn)上保持寫一個(gè)事務(wù)日志,同時(shí)再加上定期的將內(nèi)存數(shù)據(jù)鏡像(Snapshot)到磁盤來保證數(shù)據(jù)的一致性和持久性,而對(duì)于一個(gè)簡(jiǎn)單的服務(wù)發(fā)現(xiàn)的場(chǎng)景來說,這其實(shí)沒有太大的必要,這個(gè)實(shí)現(xiàn)方案太重了。而且本身存儲(chǔ)的數(shù)據(jù)應(yīng)該是高度定制化的。 -
消息發(fā)送應(yīng)該弱依賴注冊(cè)中心,而RocketMQ的設(shè)計(jì)理念也正是基于此,生產(chǎn)者在第一次發(fā)送消息的時(shí)候從NameServer獲取到Broker地址后緩存到本地,如果NameServer整個(gè)集群不可用,短時(shí)間內(nèi)對(duì)于生產(chǎn)者和消費(fèi)者并不會(huì)產(chǎn)生太大影響。
那Broker是怎么保存數(shù)據(jù)的呢?
RocketMQ主要的存儲(chǔ)文件包括commitlog文件、consumequeue文件、indexfile文件。
Broker在收到消息之后,會(huì)把消息保存到commitlog的文件當(dāng)中,而同時(shí)在分布式的存儲(chǔ)當(dāng)中,每個(gè)broker都會(huì)保存一部分topic的數(shù)據(jù),同時(shí),每個(gè)topic對(duì)應(yīng)的messagequeue下都會(huì)生成consumequeue文件用于保存commitlog的物理位置偏移量offset,indexfile中會(huì)保存key和offset的對(duì)應(yīng)關(guān)系。
CommitLog文件保存于${Rocket_Home}/store/commitlog目錄中,從圖中我們可以明顯看出來文件名的偏移量,每個(gè)文件默認(rèn)1G,寫滿后自動(dòng)生成一個(gè)新的文件。
由于同一個(gè)topic的消息并不是連續(xù)的存儲(chǔ)在commitlog中,消費(fèi)者如果直接從commitlog獲取消息效率非常低,所以通過consumequeue保存commitlog中消息的偏移量的物理地址,這樣消費(fèi)者在消費(fèi)的時(shí)候先從consumequeue中根據(jù)偏移量定位到具體的commitlog物理文件,然后根據(jù)一定的規(guī)則(offset和文件大小取模)在commitlog中快速定位。
Master和Slave之間是怎么同步數(shù)據(jù)的呢?
而消息在master和slave之間的同步是根據(jù)raft協(xié)議來進(jìn)行的:
-
在broker收到消息后,會(huì)被標(biāo)記為uncommitted狀態(tài) -
然后會(huì)把消息發(fā)送給所有的slave -
slave在收到消息之后返回ack響應(yīng)給master -
master在收到超過半數(shù)的ack之后,把消息標(biāo)記為committed -
發(fā)送committed消息給所有slave,slave也修改狀態(tài)為committed
你知道RocketMQ為什么速度快嗎?
是因?yàn)槭褂昧隧樞虼鎯?chǔ)、Page Cache和異步刷盤。
-
我們?cè)趯懭隿ommitlog的時(shí)候是順序?qū)懭氲?,這樣比隨機(jī)寫入的性能就會(huì)提高很多 -
寫入commitlog的時(shí)候并不是直接寫入磁盤,而是先寫入操作系統(tǒng)的PageCache -
最后由操作系統(tǒng)異步將緩存中的數(shù)據(jù)刷到磁盤
什么是事務(wù)、半事務(wù)消息?怎么實(shí)現(xiàn)的?
事務(wù)消息就是MQ提供的類似XA的分布式事務(wù)能力,通過事務(wù)消息可以達(dá)到分布式事務(wù)的最終一致性。
半事務(wù)消息就是MQ收到了生產(chǎn)者的消息,但是沒有收到二次確認(rèn),不能投遞的消息。
實(shí)現(xiàn)原理如下:
-
生產(chǎn)者先發(fā)送一條半事務(wù)消息到MQ -
MQ收到消息后返回ack確認(rèn) -
生產(chǎn)者開始執(zhí)行本地事務(wù) -
如果事務(wù)執(zhí)行成功發(fā)送commit到MQ,失敗發(fā)送rollback -
如果MQ長(zhǎng)時(shí)間未收到生產(chǎn)者的二次確認(rèn)commit或者rollback,MQ對(duì)生產(chǎn)者發(fā)起消息回查 -
生產(chǎn)者查詢事務(wù)執(zhí)行最終狀態(tài) -
根據(jù)查詢事務(wù)狀態(tài)再次提交二次確認(rèn)
最終,如果MQ收到二次確認(rèn)commit,就可以把消息投遞給消費(fèi)者,反之如果是rollback,消息會(huì)保存下來并且在3天后被刪除。
特別推薦一個(gè)分享架構(gòu)+算法的優(yōu)質(zhì)內(nèi)容,還沒關(guān)注的小伙伴,可以長(zhǎng)按關(guān)注一下:
長(zhǎng)按訂閱更多精彩▼
如有收獲,點(diǎn)個(gè)在看,誠摯感謝
免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺(tái)僅提供信息存儲(chǔ)服務(wù)。文章僅代表作者個(gè)人觀點(diǎn),不代表本平臺(tái)立場(chǎng),如有問題,請(qǐng)聯(lián)系我們,謝謝!