快手基于 RocketMQ 的在線消息系統(tǒng)建設(shè)實踐
作者:黃理,10 多年軟件開發(fā)和架構(gòu)經(jīng)驗,熱衷于代碼和性能優(yōu)化,開發(fā)和參與過多個開源項目。曾在淘寶任業(yè)務(wù)架構(gòu)師多年,當前在快手負責在線消息系統(tǒng)建設(shè)工作。
為什么建設(shè)在線消息系統(tǒng)
在引入 RocketMQ 之前,快手已經(jīng)在大量的使用 Kafka 了,但并非所有情況下 Kafka 都是最合適的,比如以下場景:
- 業(yè)務(wù)希望個別消費失敗以后可以重試,并且不堵塞后續(xù)其它消息的消費。
- 業(yè)務(wù)希望消息可以延遲一段時間再投遞。
- 業(yè)務(wù)需要發(fā)送的時候保證數(shù)據(jù)庫操作和消息發(fā)送是一致的(也就是事務(wù)發(fā)送)。
- 為了排查問題,有的時候業(yè)務(wù)需要一定的單個消息查詢能力。
為了應(yīng)對以上這類場景,我們需要建設(shè)一個主要面向在線業(yè)務(wù)的消息系統(tǒng),作為 Kafka 的補充。在考察的一些消息中間件中,RocketMQ 和業(yè)務(wù)需求匹配度比較高,同時部署結(jié)構(gòu)簡單,使用的公司也比較多,于是最后我們就采用了 RocketMQ。
部署模式和落地策略
在一個已有的體系內(nèi)落地一個開源軟件,通常大概有兩種方式:
方式一:在開源軟件的基礎(chǔ)上做深度修改,很容易實現(xiàn)公司內(nèi)需要的定制功能。但和社區(qū)開源版本分道揚鑣,以后如何升級?
方式二:盡量不修改社區(qū)版本(或減少不兼容的修改),而是在它的外圍或者上層進一步包裝來實現(xiàn)公司內(nèi)部需要的定制功能。
注:上圖方式一的圖畫的比較極端,實際上很多公司是方式一、方式二結(jié)合的。 我們選擇了方式二。最早的時候,我們使用的是 4.5.2 版本,后來社區(qū) 4.7 版本大幅減小了同步復(fù)制的延遲,正好我們的部署模式就是同步復(fù)制,于是就很輕松的升級了 4.7 系列,享受了新版本的紅利。 在部署集群的時候,還會面臨很多部署策略的選擇:
- 大集群 vs 小集群
- 選擇副本數(shù)
- 同步刷盤 vs 異步刷盤
- 同步復(fù)制 vs 異步復(fù)制
- SSD vs 機械硬盤
大集群會有更好的性能彈性,而小集群具有更好的隔離型,此外小集群可以不需要跨可用區(qū) /IDC 部署,所以會有更好的健壯性 。我們非常看重穩(wěn)定性,因此選擇了小集群。集群同步復(fù)制異步刷盤,首選 SSD。
客戶端封裝策略
對外只提供最基本的 API,所有訪問必須經(jīng)過我們提供的接口。簡潔的 API 就像冰山的一個角,除了對外的簡單接口,下面所有的東西都可以升級更換,而不會破壞兼容性。 業(yè)務(wù)開發(fā)起來也很簡單,只要需要提供 Topic(全局唯一)和 Group 就可以生產(chǎn)和消費,不用提供環(huán)境、NameServer 地址等。SDK 內(nèi)部會根據(jù) Topic 解析出集群 NameServer 的地址,然后連接相應(yīng)的集群。生產(chǎn)環(huán)境和測試環(huán)境環(huán)境會解析出不同的地址,從而實現(xiàn)了隔離。 上圖分為 3 層,第二層是通用的,第三層才對應(yīng)具體的 MQ 實現(xiàn),因此,理論上可以更換為其它消息中間件,而客戶端程序不需要修改。 SDK 內(nèi)部集成了熱變更機制,可以在不重啟 Client 的情況下做動態(tài)配置,比如下發(fā)路由策略(更換集群 NameServer 的地址,或者連接到別的集群去),Client 的線程數(shù)、超時時間等。通過 Maven 強制更新機制,可以保證業(yè)務(wù)使用的 SDK 基本上是最新的。
集群負載均衡 & 機房災(zāi)備
生產(chǎn)者同時連接兩個集群,如果可用區(qū) A 出現(xiàn)故障,流量就會自動切換到可用區(qū) B 的集群 2 去。我們開發(fā)了一個小組件來實現(xiàn)自適應(yīng)的集群負載均衡,它包含以下能力:
-
千萬級 OPS
-
靈活的權(quán)重調(diào)整策略
-
健康檢查支持/事件通知
-
并發(fā)度控制(自動降低響應(yīng)慢的服務(wù)器的請求數(shù))
-
資源優(yōu)先級(類似 Envoy,實現(xiàn)本地機房優(yōu)先,或是被調(diào)服務(wù)器很多的時候選取一個子集來調(diào)用)
-
自動優(yōu)先級管理
- 增量熱變更
實際上它并不僅僅用于消息生產(chǎn)者,而是一個通用的主調(diào)方負載均衡類庫,可以在 Github 上找到: https://github.com/PhantomThief/simple-failover-java。
核心的 SimpleFailover 接口和 PriorityFailover 類沒有傳遞第三方依賴,非常容易整合。
多樣的消息功能
延遲消息
延遲消息是非常重要的業(yè)務(wù)功能,不過 RocketMQ 內(nèi)置的延遲消息只能支持幾個固定的延遲級別,所以我們又開發(fā)了單獨的 Delay Server 來調(diào)度延遲消息:
上圖這個結(jié)構(gòu)沒有直接將延遲消息發(fā)到 Delay Server,而是更換 Topic 以后存入 RocketMQ。這樣的好處是可以復(fù)用現(xiàn)有的消息發(fā)送接口(以及上面的所有擴展能力)。對業(yè)務(wù)來說,只需要在構(gòu)造消息的時候額外指定一個延遲時間字段即可,其它用法都不變。
事務(wù)消息
RocketMQ 4.3 版本以后支持了事務(wù)消息,可以保證本地事務(wù)和消費發(fā)送同時成功或者失敗,對于一些業(yè)務(wù)場景很有幫助。 事務(wù)消息的用法和原理有很多資料,這里就不細述了。但關(guān)于事務(wù)消息的實踐網(wǎng)上資料較少,我們可以給出一些建議。 首先,事務(wù)消息功能一直在不斷完善,應(yīng)該使用最新的版本,至少是 4.6.1 以后的版本,可以避免很多問題。 其次,事務(wù)消息性能是不如普通消息的,它在內(nèi)部實際上會生成 3 個消息(一階段 1 個,二階段 2 個),所以性能大約只有普通消息的 1/3,如果事務(wù)消息量大的話,要做好容量規(guī)劃?;夭檎{(diào)度線程也只有 1 個,不要用極限壓力去考驗它。 最后有一些參數(shù)注意事項。在 Broker 的配置中: - transientStorePoolEnable 這個參數(shù)必須保持默認值 false,否則會有嚴重的問題。
- endTransactionThreadPoolNums是事務(wù)消息二階段處理線程大小,sendMessageThreadPoolNums 則指定一階段處理線程池大小。如果二階段的處理速度跟不上一階段,就會造成二階段消息丟失導(dǎo)致大量回查,所以建議 endTransactionThreadPoolNums 應(yīng)該大于 sendMessageThreadPoolNums,建議至少 4 倍。
- useReentrantLockWhenPutMessage 設(shè)置為 true(默認值是 false),以免線程搶鎖出現(xiàn)嚴重的不公平,導(dǎo)致二階段處理線程長時間搶不到鎖。
- transactionTimeOut 默認值 6 秒太短了,如果事務(wù)執(zhí)行時間超過 6 秒,就可能導(dǎo)致消息丟失。建議改到 1 分鐘左右。
分布式對賬監(jiān)控
除了比較一些常規(guī)的監(jiān)控手段以外,我們開發(fā)了一個監(jiān)控程序做分布式對賬??梢园l(fā)現(xiàn)我們的集群以及我們提供的 SDK 是否有異常。
具體做法是在每個 Broker 上都建立一個監(jiān)控專用的 Topic,監(jiān)控程序使用我們自己提供的 SDK 框架來連接集群(就像我們的業(yè)務(wù)用戶那樣),監(jiān)控生產(chǎn)者會給每個集群發(fā)送少量消息。然后檢查發(fā)送是否成功:
發(fā)送成功 |
成功 |
刷盤超時 |
|
Slave 超時 |
|
Slave 不可用 |
|
發(fā)送失敗 |
具體錯誤碼 |
生產(chǎn)者只對這些結(jié)果進行打點,不判斷是否正常,具體到監(jiān)控(或者演練)場景可以配置不同的報警規(guī)則。 消費者收到了消息會通過 TCP 旁路 ACK 生產(chǎn)者,生產(chǎn)者這邊會做分布式對賬,將對賬結(jié)果打點:
- 收到消息
- 消息丟失(或超時未收到消息)
- 重復(fù)收到消息
- 消息生成到最終消費的時間差
- ACK 生產(chǎn)者失?。ㄓ上M者打點)
性能優(yōu)化
Broker 默認的參數(shù)在我們的場景下(SSD、同步復(fù)制、異步刷盤)不是最優(yōu)的,有的參數(shù)也許在大多數(shù)場景下都不是最優(yōu)的。我們列出一些重要的參數(shù),供大家參考:
參數(shù) | 默認值 | 說明 |
flushCommitLogTimed | False | 默認值不合理,異步刷盤這個參數(shù)應(yīng)該設(shè)置成 true,導(dǎo)致頻繁刷盤,對性能影響極大。 |
deleteWhen | 04 | 幾點刪除過期文件的時間,刪除文件時有很多磁盤讀,這個默認值是合理的,有條件的話還是建議低峰刪除。 |
sendMessageThreadPoolNums | 1 | 處理生產(chǎn)消息的線程數(shù),這個線程干的事情很多,建議設(shè)置為 2~4,但太多也沒有什么用。因為最終寫 commit log 的時候只有一個線程能拿到鎖。 |
useReentrantLockWhenPutMessage | False | 如果前一個參數(shù)設(shè)置比較大,這個最好設(shè)置為 true,避免高負載下自旋鎖空轉(zhuǎn)消耗 CPU。 |
sendThreadPoolQueueCapacity | 10000 | 處理生產(chǎn)消息的隊列大小,默認值可能有點小,比如 5 萬 TPS(異步發(fā)送)的情況下,卡 200ms 就會爆。設(shè)置比較小的數(shù)字可能是擔心有大量大消息撐爆內(nèi)存(比如 100K 的話, 1 萬個的消息大概占用 1G 內(nèi)存,也還好),具體可以自己算,如果都是小消息,可以把這個數(shù)字改大??梢孕薷?Broker 參數(shù)限制 Client 發(fā)送大消息。 |
brokerFastFailureEnable | True | Broker 端快速失?。ㄏ蘖鳎?,和下面兩個參數(shù)配合。這個機制可能有爭議,client 設(shè)置了超時時間,如果 client 還愿意等,并且 sendThreadPoolQueue 還沒有滿,不應(yīng)該失敗,sendThreadPoolQueue 滿了自然會拒絕新的請求。但如果 Client 設(shè)置的超時時間很短,沒有這個機制可能導(dǎo)致消息重復(fù)??梢宰孕袥Q定是否開啟。理想情況下,能根據(jù) Client 設(shè)置的超時時間來清理隊列是最好的。 |
waitTimeMillsInSendQueue | 200 | 200ms 很容易導(dǎo)致發(fā)送失敗,建議改大,比如 1000ms。 |
osPageCacheBusyTimeOutMills | 1000 | Page cache 超時時間,如果內(nèi)存比較多,比如 32G 以上,建議改大點。 |
總結(jié)
得益于簡單、幾乎 0 依賴的部署模式,使得我們部署小集群的成本非常低;不對社區(qū)版本進行魔改,保證我們可以及時升級;統(tǒng)一 SDK 入口方便集群維護和功能升級;通過復(fù)合小集群+自動負載均衡實現(xiàn)多機房多活;充分利用 RocketMQ 的功能,比如事務(wù)消息、延遲消息(增強)來滿足業(yè)務(wù)的多樣性需求;通過自動的分布式對賬,對每一個 Broker 以及我們的 SDK 進行正確性監(jiān)控。
本文也進行了一些性能參數(shù)的分享,但寫的比較簡單,基本只說了怎么調(diào),但沒能細說為什么,以后我們會另寫文章詳述。目前 RocketMQ 已經(jīng)應(yīng)用在公司在大多數(shù)業(yè)務(wù)線,期待將來會有更好的發(fā)展!
免責聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺僅提供信息存儲服務(wù)。文章僅代表作者個人觀點,不代表本平臺立場,如有問題,請聯(lián)系我們,謝謝!