Flink 在快手應(yīng)用場景及規(guī)模
快手實時多維分析平臺
SlimBase-更省 IO、嵌入式共享 state 存儲
首先看 Flink 在快手的應(yīng)用場景和規(guī)模。
1. 快手應(yīng)用場景
快手計算鏈路是從 DB/Binlog 以及 WebService Log 實時入到 Kafka 中,然后接入 Flink 做實時計算,其中包括實時數(shù)倉、實時分析以及實時訓(xùn)練,最后的結(jié)果存到 Druid、Kudu、HBase 或者 ClickHouse 里面;同時 Kafka 數(shù)據(jù)實時 Dump 一份到 Hadoop 集群,然后通過 Hive、MapReduce 或者 Spark 來做離線計算;最終實時計算和離線計算的結(jié)果數(shù)據(jù)會用內(nèi)部自研 BI 工具 KwaiBI 來展現(xiàn)出來。
Flink 在快手典型的應(yīng)用場景主要分為三大類:
80% 統(tǒng)計監(jiān)控:實時統(tǒng)計,包括各項數(shù)據(jù)的指標,監(jiān)控項報警,用于輔助業(yè)務(wù)進行實時分析和監(jiān)控;
15% 數(shù)據(jù)處理:對數(shù)據(jù)的清洗、拆分、Join 等邏輯處理,例如大 Topic 的數(shù)據(jù)拆分、清洗;
5% 數(shù)據(jù)處理:實時業(yè)務(wù)處理,針對特定業(yè)務(wù)邏輯的實時處理,例如實時調(diào)度。
Flink 在快手應(yīng)用的典型場景案例包括:
快手是分享短視頻跟直播的平臺,快手短視頻、直播的質(zhì)量監(jiān)控是通過 Flink 進行實時統(tǒng)計,比如直播觀眾端、主播端的播放量、卡頓率、開播失敗率等跟直播質(zhì)量相關(guān)的多種監(jiān)控指標;
用戶增長分析,實時統(tǒng)計各投放渠道拉新情況,根據(jù)效果實時調(diào)整各渠道的投放量;
實時數(shù)據(jù)處理,廣告展現(xiàn)流、點擊流實時 Join,客戶端日志的拆分等;
直播 CDN 調(diào)度,實時監(jiān)控各 CDN 廠商質(zhì)量,通過 Flink 實時訓(xùn)練調(diào)整各個 CDN 廠商流量配比。
2. Flink 集群規(guī)模
快手目前集群規(guī)模有 1500 臺左右,日處理條目數(shù)總共有3萬億,峰值處理條目數(shù)大約是 3億/s 左右。集群部署都是 On Yarn 模式,實時集群和離線集群混合部署,通過 Yarn 標簽進行物理隔離,實時集群是 Flink 專用集群,針對隔離性、穩(wěn)定性要求極高的業(yè)務(wù)部署。注:本文所涉及數(shù)據(jù)僅代表嘉賓分享時的數(shù)據(jù)。
此處重點和大家分享下快手的實時多維分析平臺。
1. 快手實時多維分析場景
快手內(nèi)部有這樣的應(yīng)用場景,每天的數(shù)據(jù)量在百億級別,業(yè)務(wù)方需要在數(shù)據(jù)中任選五個以內(nèi)的維度組合進行全維的建模進而計算累計的 PV ( Page View 訪問量 )、UV ( Unique Visitor 獨立訪客 )、新增或者留存等這樣的指標,然后指標的計算結(jié)果要實時進行圖形化報表展示供給業(yè)務(wù)分析人員進行分析。
2. 方案選型
現(xiàn)在社區(qū)已經(jīng)有一些 OLAP 實時分析的工具,像 Druid 和 ClickHouse;目前快手采用的是 Flink+Kudu 的方案,在前期調(diào)研階段對這三種方案從計算能力、分組聚合能力、查詢并發(fā)以及查詢延遲四個方面結(jié)合實時多維查詢業(yè)務(wù)場景進行對比分析:
計算能力方面:多維查詢這種業(yè)務(wù)場景需要支持 Sum、Count 和 count distinct 等能力,而 Druid 社區(qū)版本不支持 count distinct,快手內(nèi)部版本支持數(shù)值類型、但不支持字符類型的 count distinct;ClickHouse 本身全都支持這些計算能力;Flink 是一個實時計算引擎,這些能力也都具備。
分組聚合能力方面:Druid 的分組聚合能力一般,ClickHouse 和 Flink 都支持較強的分組聚合能力。
查詢并發(fā)方面:ClickHouse 的索引比較弱,不能支持較高的查詢并發(fā),Druid 和 Flink 都支持較高的并發(fā)度,存儲系統(tǒng) Kudu,它也支持強索引以及很高的并發(fā)。
查詢延遲方面:Druid 和 ClickHouse 都是在查詢時進行現(xiàn)計算,而 Flink+Kudu 方案,通過 Flink 實時計算后將指標結(jié)果直接存儲到 Kudu 中,查詢直接從 Kudu 中查詢結(jié)果而不需要進行計算,所以查詢延遲比較低。
采用 Flink+Kudu 的方案主要思想是借鑒了 Kylin 的思路,Kylin 可以指定很多維度和指標進行離線的預(yù)計算然后將預(yù)計算結(jié)果存儲到 HBase 中;快手的方案是通過 Flink 實時計算指標,再實時地寫到 Kudu 里面。
3. 方案設(shè)計
實時多維分析的整體的流程為:用戶在快手自研的 BI 分析工具 KwaiBI 上配置 Cube 數(shù)據(jù)立方體模型,指定維度列和指標列以及基于指標做什么樣的計算;配置過程中選擇的數(shù)據(jù)表是經(jīng)過處理過后存儲在實時數(shù)倉平臺中的數(shù)據(jù)表;然后根據(jù)配置的計算規(guī)則通過 Flink 任務(wù)進行建模指標的預(yù)計算,結(jié)果存儲到 Kudu 中;最后 KwaiBI 從 Kudu 中查詢數(shù)據(jù)進行實時看板展示。
接下來詳細介紹一下實時多維分析的主要模塊。
① 數(shù)據(jù)預(yù)處理
KwaiBI 配置維度建模時選擇的數(shù)據(jù)表,是經(jīng)過提前預(yù)處理的:
首先內(nèi)部有一個元信息系統(tǒng),在元信息系統(tǒng)中提供統(tǒng)一的 schema 服務(wù),所有的信息都被抽象為邏輯表;
例如 Kafka 的 topic、Redis、HBase 表等元數(shù)據(jù)信息都抽取成 schema 存儲起來;
快手 Kafka 的物理數(shù)據(jù)格式大部分是 Protobuf 和 Json 格式,schema 服務(wù)平臺也支持將其映射為邏輯表;
用戶只需要將邏輯表建好之后,就可以在實時數(shù)倉對數(shù)據(jù)進行清洗和過濾。
② 建模計算指標
數(shù)據(jù)預(yù)處理完成后,最重要的步驟是進行建模指標計算,此處支持 Cube、GroupingSet 方式維度組合來計算小時或者天累計的 UV ( Unique Visitor )、新增和留存等指標,可以根據(jù)用戶配置按固定時間間隔定期輸出結(jié)果;維度聚合邏輯中,通過逐層降維計算的方式會讓 DAG 作業(yè)圖十分復(fù)雜,如上圖右上角模型所示;因此快手設(shè)計了兩層降維計算模型,分為全維度層和剩余維度層,這樣既利用了全維度層的聚合結(jié)果又簡化了 DAG 作業(yè)圖。
以 UV 類指標計算舉例,兩個黃色虛線框分別對應(yīng)兩層計算模塊:全維計算和降維計算。
全維計算分為兩個步驟,為避免數(shù)據(jù)傾斜問題,首先是維度打散預(yù)聚合,將相同的維度值先哈希打散一下。因為 UV 指標需要做到精確去重,所以采用 Bitmap 進行去重操作,每分鐘一個窗口計算出增量窗口內(nèi)數(shù)據(jù)的 Bitmap 發(fā)送給第二步按維度全量聚合;在全量聚合中,將增量的 Bitmap 合并到全量 Bitmap 中最終得出準確的 UV 值。然而有人會有問題,針對用戶 id 這種的數(shù)值類型的可以采用此種方案,但是對于 deviceid 這種字符類型的數(shù)據(jù)應(yīng)該如何處理?實際上在源頭,數(shù)據(jù)進行維度聚合之前,會通過字典服務(wù)將字符類型的變量轉(zhuǎn)換為唯一的 Long 類型值,進而通過 Bitmap 進行去重計算 UV。
降維計算中,通過全維計算得出的結(jié)果進行預(yù)聚合然后進行全量聚合,最終將結(jié)果進行輸出。
再重點介紹下,建模指標計算中的幾個關(guān)鍵點。在建模指標計算中,為了避免維度數(shù)據(jù)傾斜問題,通過預(yù)聚合 ( 相同維度 hash 打散 ) 和全量聚合 ( 相同維度打散后聚合 ) 兩種方式來解決;為了解決 UV 精確去重問題,前文有提到,使用 Bitmap 進行精確去重,通過字典服務(wù)將 String 類型數(shù)據(jù)轉(zhuǎn)換成 Long 類型數(shù)據(jù)進而便于存儲到 Bitmap 中,因為統(tǒng)計 UV 要統(tǒng)計歷史的數(shù)據(jù),比如說按天累計,隨著時間的推移,Bitmap 會越來越大,在 Rocksdb 狀態(tài)存儲下,讀寫過大的 KV 會比較耗性能,所以內(nèi)部自定義了一個 BitmapState,將 Bitmap 進行分塊存儲,一個 blockid 對應(yīng)一個局部的 bitmap,這樣在 RocksDB 中存儲時,一個 KV 會比較小,更新的時候也只需要根據(jù) blockid 更新局部的 bitmap 就可以而不需要全量更新。
接下來,看新增類的指標計算,和剛剛 UV 的不同點是需要判斷是否為新增用戶,通過異步地訪問外部的歷史用戶服務(wù)進行新增用戶判斷,再根據(jù)新增用戶流計算新增 UV,這塊計算邏輯和 UV 計算一致。
然后,再來看留存類指標計算,與 UV 計算不同的時候,不僅需要當(dāng)天的數(shù)據(jù)還需要前一天的歷史數(shù)據(jù),這樣才能計算出留存率,內(nèi)部實現(xiàn)的時候是采用雙 buffer state 存儲,在計算的時候?qū)㈦p buffer 數(shù)據(jù)相除就可以計算出留存率。
③ Kudu 存儲
最后經(jīng)過上面的計算邏輯后,會將結(jié)果存儲到 Kudu 里面,其本身具有低延遲隨機讀寫以及快速列掃描等特點,很適合實時交互分析場景;在存儲方式上,首先對維度進行編碼,然后按時間+維度組合+維度值組合作為主鍵,最終按維度組合、維度值組合、時間進行分區(qū),這樣有利于提高查詢的效率快速獲取到數(shù)據(jù)。
4. KwaiBI 展示
界面為配置 Cube 模型的截圖,配置一些列并指定類型,再通過一個 SQL 語句來描述指標計算的邏輯,最終結(jié)果也會通過 KwaiBI 展示出來。
SlimBase
更省 IO、嵌入式共享 state 存儲
接下來介紹一種比 RocksDB 更省 IO、嵌入式的共享 state 存儲引擎:SlimBase。
1. 面臨的挑戰(zhàn)
首先看一下 Flink 使用 RocksDB 遇到的問題,先闡述一下快手的應(yīng)用場景、廣告展現(xiàn)點擊流實時 Join 場景:打開快手 App 可能會收到廣告服務(wù)推薦的廣告視頻,用戶可能會點擊展現(xiàn)的廣告視頻。這樣的行為在后端會形成兩份數(shù)據(jù)流,一份是廣告展現(xiàn)日志,一份是客戶端點擊日志。這兩份數(shù)據(jù)進行實時 Join,并將 Join 結(jié)果作為樣本數(shù)據(jù)用于模型訓(xùn)練,訓(xùn)練出的模型會被推送到線上的廣告服務(wù)。該場景下展現(xiàn)以后20分鐘的點擊被認為是有效點擊,實時 Join 邏輯則是點擊數(shù)據(jù) Join 過去20分鐘內(nèi)的展現(xiàn)。其中,展現(xiàn)流的數(shù)據(jù)量相對比較大,20分鐘數(shù)據(jù)在 1TB 以上。檢查點設(shè)置為五分鐘,Backend 選擇 RocksDB。
在這樣的場景下,面臨著磁盤 IO 開銷70%,其中50%開銷來自于 Compaction;在 Checkpoint 期間,磁盤 IO 開銷達到了100%,耗時在1~5分鐘,甚至?xí)L于 Checkpoint 間隔,業(yè)務(wù)能明顯感覺到反壓。經(jīng)過分析找出問題:
首先,在 Checkpoint 期間會產(chǎn)生四倍的大規(guī)模數(shù)據(jù)拷貝,即:從 RocksDB 中全量讀取出來然后以三副本形式寫入到 HDFS 中;
其次,對于大規(guī)模數(shù)據(jù)寫入,RocksDB 的默認 Level Compaction 會有嚴重的 IO 放大開銷。
2. 解決方案
由于出現(xiàn)上文闡述的問題,開始尋找解決方案,整體思路是在數(shù)據(jù)寫入時直接落地到共享存儲中,避免 Checkpoint 帶來的數(shù)據(jù)拷貝問題。手段是嘗試使用更省 IO 的 Compaction,例如使用 SizeTieredCompation 方式,或者利用時序數(shù)據(jù)的特點使用并改造 FIFOCompaction。綜合比較共享存儲、SizeTieredCompation、基于事件時間的 FIFOCompaction 以及技術(shù)棧四個方面得出共識:HBase 代替 RocksDB 方案。
-
共享存儲方面,HBase 支持, RocksDB 不支持 -
SizeTieredCompation 方面,RocksDB 默認不支持,但 HBase 默認支持,開發(fā)起來比較簡單 -
基于事件時間下推的 FIFOCompaction 方面,RocksDB 不支持,但 HBase 開發(fā)起來比較簡單 -
技術(shù)棧方面,RocksDB 使用 C++,HBase 使用 java,HBase 改造起來更方便
但是 HBase 有些方面相比 RocksDB 較差:
HBase 是一個依賴 zookeeper、包含 Master 和 RegionServer 的重量級分布式系統(tǒng);而 RocksDB 僅是一個嵌入式的 Lib 庫,很輕量級。
在資源隔離方面,HBase 比較困難,內(nèi)存和 cpu 被多個 Container 共享;而 RocksDB 比較容易,內(nèi)存和 cpu 伴隨 Container 天生隔離。
網(wǎng)絡(luò)開銷方面,因為 HBase 是分布式的,所有比嵌入式的 RocksDB 開銷要大很多。
綜合上面幾點原因,快手達成了第二個共識,將 HBase 瘦身,改造為嵌入式共享存儲系統(tǒng)。
3. 實現(xiàn)方案
接下來介紹一下將 HBase 改造成 SlimBase 的實現(xiàn)方案,主要是分為兩層:
一層是 SlimBase 本身,包含三層結(jié)構(gòu):Slim HBase、適配器以及接口層;
另一層是 SlimBaseStateBackend,主要包含 ListState、MapState、ValueState 和 ReduceState。
后面將從 HBase 瘦身、適配并實現(xiàn)操作接口以及實現(xiàn) SlimBaseStateBackend 三個步驟分別進行詳細介紹。
① HBase 瘦身
先講 HBase 瘦身,主要從減肥和增瘦兩個步驟,在減肥方面:
先對 HBase 進行減裁,去除 client、zookeeper 和 master,僅保留 RegionServer
再對 RegionServer 進行剪裁,去除 ZK Listener、Master Tracker、Rpc、WAL 和 MetaTable
僅保留 RegionServer 中的 Cache、Memstore、Compaction、Fluster 和 Fs
在增瘦方面:
將原來 Master 上用于清理 Hfile 的 HFileCleaner 遷移到 RegionServer 上
RocksDB 支持讀放大寫的 merge 接口,但是 SlimBase 是不支持的,所以要實現(xiàn) merge 的接口
接口層主要有以下三點實現(xiàn):
仿照 RocksDB,邏輯視圖分為兩級:DB 和 ColumnFamily
支持一些基本的接口:put/get/delete/merge 和 snapshot
額外支持了 restore 接口,用于從 snapshot 中恢復(fù)
適配層主要有以下兩個概念:
一個 SlimBase 適配為 Hbase 的 namespace
一個 SlimBase 的 ColumnFamily 適配為 HBase 的 table
SlimBaseStateBackend 實現(xiàn)上主要體現(xiàn)在兩個方面:
一是多種 States 實現(xiàn),支持多種數(shù)據(jù)結(jié)構(gòu),ListState、MapState、ValueState 和 ReduceState
二是改造 Snapshot 和 Restore 的流程,從下面的兩幅圖可以看出,SlimBase 在磁盤 IO 上節(jié)省了大量的資源,避免了多次的 IO 的問題。
4. 測試結(jié)論
上線對比測試后,得出測試結(jié)論:
Checkpoint 和 Restore 的時延從分鐘級別降到秒級。
磁盤 IO 下降了66%
磁盤寫吞吐下降50%
CPU 開銷下降了33%
5. 后期優(yōu)化
目前用的 Compaction 策略是 SizeTieredCompaction,后期要實現(xiàn)基于 OldestUnexpiredTime 的 FiFOCompaction 策略,目標是做到無磁盤 IO 開銷。
FiFOCompaction 是一種基于 TTL 的無 IO 的 Compaction 策略;OldestUnexpiredTime 是指例如設(shè)置 OldestUnexpiredTime=t2,表示 t2 時刻前的數(shù)據(jù)全部過期,可以被 Compaction 清理,基于時間點的 FIFOCompaction 理論上可以做到無磁盤 IO 開銷。
后續(xù)還有四點優(yōu)化,前三點是基于 HBase 的優(yōu)化,最后是針對 HDFS 做的優(yōu)化:
SlimBase 使用 InMemoryCompaction,降低內(nèi)存 Flush 和 Compaction 開銷
SlimBase 支持 prefixBloomFilter,提高 Scan 性能
SlimBase 支持短路讀
HDFS 副本落盤改造:非本地副本使用 DirectIO 直接落盤,提高本地讀 pagecache 命中率;此條主要是在測試使用時發(fā)現(xiàn)單副本比多副本讀寫效率高這一問題
6. 未來規(guī)劃
從語言、存儲、壓縮策略、事件事件下推、垃圾回收、檢查點時間、重加載時間七個方面來看,SlimBase 都比 RocksDB 更適合快手實時計算任務(wù)的開發(fā),未來的規(guī)劃是對 SlimBase 的性能做進一步優(yōu)化,愿景是將快手 Flink 上的所有業(yè)務(wù)場景全部用 SlimBase 替代掉 RocksDB。
分享嘉賓:
董亭亭,快手實時計算引擎團隊負責(zé)人。
特別推薦一個分享架構(gòu)+算法的優(yōu)質(zhì)內(nèi)容,還沒關(guān)注的小伙伴,可以長按關(guān)注一下:
長按訂閱更多精彩▼
如有收獲,點個在看,誠摯感謝
免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺僅提供信息存儲服務(wù)。文章僅代表作者個人觀點,不代表本平臺立場,如有問題,請聯(lián)系我們,謝謝!