唯品會(huì)億級(jí)數(shù)據(jù)服務(wù)平臺(tái)落地實(shí)踐
數(shù)據(jù)服務(wù)是數(shù)據(jù)中臺(tái)體系中的關(guān)鍵組成部分。作為數(shù)倉(cāng)對(duì)接上層應(yīng)用的統(tǒng)一出入口,數(shù)據(jù)服務(wù)將數(shù)倉(cāng)當(dāng)作一個(gè)統(tǒng)一的 DB 來(lái)訪問(wèn),提供統(tǒng)一的 API 接口控制數(shù)據(jù)的流入及流出,能夠滿足用戶對(duì)不同類型數(shù)據(jù)的訪問(wèn)需求。
電商平臺(tái)唯品會(huì)的數(shù)據(jù)服務(wù)自 2019 年開始建設(shè),在公司內(nèi)經(jīng)歷了從無(wú)到有落地,再到為超過(guò) 30 業(yè)務(wù)方提供 to B、to C 的數(shù)據(jù)服務(wù)的過(guò)程。本文主要介紹唯品會(huì)自研數(shù)據(jù)服務(wù) Hera 的相關(guān)背景、架構(gòu)設(shè)計(jì)和核心功能。
背景
在統(tǒng)一數(shù)倉(cāng)數(shù)據(jù)服務(wù)之前,數(shù)倉(cāng)提供的訪問(wèn)接入方式往往存在效率問(wèn)題低、數(shù)據(jù)指標(biāo)難統(tǒng)一等問(wèn)題,具體而言有以下幾個(gè)比較突出的情況:- 廣告人群 USP、DMP 系統(tǒng)每天需要通過(guò) HiveServer 以流的方式從數(shù)倉(cāng)導(dǎo)出數(shù)據(jù)到本地,每個(gè)人群的數(shù)據(jù)量從幾十萬(wàn)到幾個(gè)億,人群數(shù)量 2w ,每個(gè)人群運(yùn)行時(shí)間在 30min ,部分大人群的運(yùn)行直接超過(guò) 1h,在資源緊張的情況下,人群延遲情況嚴(yán)重。
- 數(shù)倉(cāng)的數(shù)據(jù)在被數(shù)據(jù)產(chǎn)品使用時(shí),需要為每個(gè)表新生成一個(gè)單獨(dú)的接口,應(yīng)用端需要為每一種訪問(wèn)方式(如 Presto、ClickHouse)區(qū)分使用不同的接口,導(dǎo)致數(shù)據(jù)產(chǎn)品接口暴漲,不方便維護(hù),影響開發(fā)及維護(hù)效率。數(shù)據(jù)在不同的存儲(chǔ)時(shí),需要包含 clickhouse-client,presto-client 等等第三方 jar 包。
- 不同數(shù)據(jù)產(chǎn)品中都需要使用一些常用的數(shù)據(jù)指標(biāo),如銷售額、訂單數(shù)、PV、UV 等,而這些數(shù)據(jù)在不同數(shù)據(jù)產(chǎn)品的實(shí)現(xiàn)口徑、實(shí)現(xiàn)方式都不一樣,無(wú)法形成數(shù)據(jù)共享,每個(gè)數(shù)據(jù)產(chǎn)品都重復(fù)進(jìn)行相同的指標(biāo)建設(shè)。因此,在不同數(shù)據(jù)產(chǎn)品查看相同指標(biāo)卻發(fā)現(xiàn)數(shù)值不同的情況下,難以判斷哪個(gè)數(shù)據(jù)產(chǎn)品提供的數(shù)據(jù)是準(zhǔn)確的。
圖 1.在統(tǒng)一數(shù)倉(cāng)數(shù)據(jù)服務(wù)之前,數(shù)據(jù)流入流出方式
為解決以上問(wèn)題,數(shù)據(jù)服務(wù)應(yīng)運(yùn)而生。目前數(shù)據(jù)服務(wù)的主要優(yōu)勢(shì)有:屏蔽底層的存儲(chǔ)引擎、計(jì)算引擎,使用同一個(gè) API(one service),數(shù)倉(cāng)數(shù)據(jù)分層存儲(chǔ),不同 engine 的 SQL 生成能力,自適應(yīng) SQL 執(zhí)行以及統(tǒng)一緩存架構(gòu)保障業(yè)務(wù) SLA,支持?jǐn)?shù)據(jù)注冊(cè)并授權(quán)給任何調(diào)用方進(jìn)行使用,提高數(shù)據(jù)交付效率。
通過(guò)唯一的 ID 標(biāo)識(shí),數(shù)據(jù)產(chǎn)品可通過(guò) ID 查閱數(shù)據(jù),而非直接訪問(wèn)對(duì)應(yīng)的數(shù)倉(cāng)表。一方面,指標(biāo)服務(wù)統(tǒng)一了指標(biāo)的口徑,同時(shí)也支持快速構(gòu)建新的數(shù)據(jù)產(chǎn)品。
架構(gòu)設(shè)計(jì)
數(shù)據(jù)服務(wù)能給業(yè)務(wù)帶來(lái)運(yùn)營(yíng)和商業(yè)價(jià)值,核心在于給用戶提供自助分析數(shù)據(jù)能力。Hera 整體架構(gòu)基于典型的 Master/slave 模型,數(shù)據(jù)流與控制流單獨(dú)鏈路,從而保障系統(tǒng)的高可用性。數(shù)據(jù)服務(wù)系統(tǒng)主要分為三層:- 應(yīng)用接入層:業(yè)務(wù)申請(qǐng)接入時(shí),可以根據(jù)業(yè)務(wù)要求選擇數(shù)據(jù)服務(wù) API(TCP Client), HTTP 以及 OSP 服務(wù)接口(公司內(nèi)部 RPC 框架)。
- 數(shù)據(jù)服務(wù)層:主要執(zhí)行業(yè)務(wù)提交的任務(wù),并返回結(jié)果。主要功能點(diǎn)包括:路由策略,多引擎支持,引擎資源配置,引擎參數(shù)動(dòng)態(tài)組裝,SQLLispengine 生成,SQL 自適應(yīng)執(zhí)行,統(tǒng)一數(shù)據(jù)查詢緩存,F(xiàn)reeMaker SQL 動(dòng)態(tài)生成等功能。
- 數(shù)據(jù)層:業(yè)務(wù)查詢的數(shù)據(jù)無(wú)論在數(shù)倉(cāng)、Clickhouse、MySQL 還是 Redis 中,都可以很好地得到支持,用戶都使用同一套 API。
圖 2. 數(shù)據(jù)服務(wù)整體架構(gòu)圖
調(diào)度系統(tǒng)的整體流程大致包含以下模塊:
- Master:負(fù)責(zé)管理所有的 Worker、TransferServer、AdhocWorker 節(jié)點(diǎn),同時(shí)負(fù)責(zé)調(diào)度分發(fā)作業(yè);
- Worker:負(fù)責(zé)執(zhí)行 ETL 和數(shù)據(jù)文件導(dǎo)出類型的作業(yè),拉起 AdhocWorker 進(jìn)程(Adhoc 任務(wù)在 AdhocWorker 進(jìn)程中的線程池中執(zhí)行),ETL 類型的作業(yè)通過(guò)子進(jìn)程的方式完成;
- Client:客戶端,用于編程式地提交 SQL 作業(yè);
- ConfigCenter:負(fù)責(zé)向集群推送統(tǒng)一配置信息及其它運(yùn)行時(shí)相關(guān)的配置和 SQLParser (根據(jù)給定的規(guī)則解析、替換、生成改寫 SQL 語(yǔ)句,以支持不同計(jì)算引擎的執(zhí)行);
- TransferServer:文件傳輸服務(wù)。
圖 3. 數(shù)據(jù)服務(wù)調(diào)度流程圖
主要功能
Hera 數(shù)據(jù)服務(wù)的主要功能有:多隊(duì)列調(diào)度策略、多引擎查詢、多任務(wù)類型、文件導(dǎo)出、資源隔離、引擎參數(shù)動(dòng)態(tài)組裝、自適應(yīng) Engine 執(zhí)行和 SQL 構(gòu)建。多隊(duì)列調(diào)度策略
數(shù)據(jù)服務(wù)支持按照不同用戶、不同任務(wù)類型并根據(jù)權(quán)重劃分不同調(diào)度隊(duì)列,以滿足不同任務(wù)類型的 SLA。多引擎查詢
數(shù)據(jù)服務(wù)支持目前公司內(nèi)部所有 OLAP 和數(shù)據(jù)庫(kù)類型,包括 Spark、Presto、Clickhouse、Hive 、MySQL、Redis。會(huì)根據(jù)業(yè)務(wù)具體場(chǎng)景和要求,選擇當(dāng)前最佳的查詢引擎。多任務(wù)類型
數(shù)據(jù)服務(wù)支持的任務(wù)類型有:ETL、Adhoc、文件導(dǎo)出、數(shù)據(jù)導(dǎo)入。加上多引擎功能,實(shí)現(xiàn)多種功能組合,如 Spark adhoc 和 Presto adhoc。文件導(dǎo)出
主要是支持大量的數(shù)據(jù)從數(shù)據(jù)倉(cāng)庫(kù)中導(dǎo)出,便于業(yè)務(wù)分析和處理,比如供應(yīng)商發(fā)券和信息推送等。具體執(zhí)行過(guò)程如下:用戶提交需要導(dǎo)出數(shù)據(jù)的 SQL,通過(guò)分布式 engine 執(zhí)行完成后,落地文件到 hdfs/alluxio. 客戶端通過(guò) TCP 拉取文件到本地。千萬(wàn)億級(jí)的數(shù)據(jù)導(dǎo)出耗時(shí)最多 10min。數(shù)據(jù)導(dǎo)出在人群數(shù)據(jù)導(dǎo)出上性能由原來(lái)的 30min ,提升到最多不超過(guò) 3min,性能提升 10~30 倍。具體流程如下:
圖 4. 數(shù)據(jù)服務(wù)文件下載流程圖
資源隔離(Worker 資源和計(jì)算資源)
業(yè)務(wù)一般分為核心和非核心,在資源分配和調(diào)度上也不同。主要是從執(zhí)行任務(wù) Worker 和引擎資源,都可以實(shí)現(xiàn)物理級(jí)別的隔離,最大化減少不同業(yè)務(wù)之間相互影響。引擎參數(shù)動(dòng)態(tài)組裝
線上業(yè)務(wù)執(zhí)行需要根據(jù)業(yè)務(wù)情況進(jìn)行調(diào)優(yōu),動(dòng)態(tài)限制用戶資源使用,集群整體切換等操作,這個(gè)時(shí)候就需要對(duì)用戶作業(yè)參數(shù)動(dòng)態(tài)修改,如 OLAP 引擎執(zhí)行任務(wù)時(shí),經(jīng)常都要根據(jù)任務(wù)調(diào)優(yōu),設(shè)置不同參數(shù)。針對(duì)這類問(wèn)題,數(shù)據(jù)服務(wù)提供了根據(jù)引擎類型自動(dòng)組裝引擎參數(shù),并且引擎參數(shù)支持動(dòng)態(tài)調(diào)整,也可以針對(duì)特定任務(wù)、執(zhí)行賬號(hào)、業(yè)務(wù)類型來(lái)設(shè)定 OLAP 引擎執(zhí)行參數(shù)。自適應(yīng) Engine 執(zhí)行
業(yè)務(wù)方在查詢時(shí),有可能因?yàn)橐尜Y源不足或者查詢條件數(shù)據(jù)類型不匹配從而導(dǎo)致執(zhí)行失敗。為了提高查詢成功率和服務(wù) SLA 保障,設(shè)計(jì)了 Ad Hoc 自適應(yīng)引擎執(zhí)行,當(dāng)一個(gè)引擎執(zhí)行報(bào)錯(cuò)后,會(huì)切換到另外一個(gè)引擎繼續(xù)執(zhí)行。具體自適應(yīng)執(zhí)行邏輯如下圖所示:圖 5. 自適應(yīng) Engine 執(zhí)行
SQL 構(gòu)建
數(shù)據(jù)服務(wù) SQL 構(gòu)建基于維度事實(shí)建模,支持單表模型、星型模型和雪花模型。- 單表模型:一張事實(shí)表,一般為 DWS 或者 ADS 的匯總事實(shí)表。
- 星型模型:1 張事實(shí)表(如 DWD 明細(xì)事實(shí)表) N 張維表,例如訂單明細(xì)表 (事實(shí)表 FK=商品 ID) ? 商品維表 (維度表 PK=商品 ID)?。
- 雪花模型:1 張事實(shí)表(如 DWD 明細(xì)事實(shí)表) N 張維表 M 張沒(méi)有直接連接到事實(shí)表的維表,例如訂單明細(xì)表 (事實(shí)表 FK=商品 ID) ? 商品維表 (維度表 PK=商品 ID,F(xiàn)K=品類 ID)? ? 品類維表(維度表 PK=品類 ID)。
圖 6.SQL 維度模型
自定義語(yǔ)法(Lisp)描述指標(biāo)的計(jì)算公式
Lisp 是一套自定義的語(yǔ)法,用戶可以使用 Lisp 來(lái)描述指標(biāo)的計(jì)算公式。其目標(biāo)是為了構(gòu)建統(tǒng)一的指標(biāo)計(jì)算公式處理范式,屏蔽底層的執(zhí)行引擎的語(yǔ)法細(xì)節(jié),最大化優(yōu)化業(yè)務(wù)配置和生成指標(biāo)的效率。Lisp 總體格式 (oprator param1 param2 ...) param 可以是一個(gè)參數(shù),也可以是一個(gè) Lisp 表達(dá)式。目前已經(jīng)實(shí)現(xiàn)的功能:
- 聚合表達(dá)式
在 Presto 中的實(shí)現(xiàn)是 approx_distinct(x,e) over (partition by y,z),在 Spark 中的實(shí)現(xiàn)是 approx_count_distinct(x,e) over (partition by y,z)。y,z 只在開窗函數(shù)模式下才生效。目前也支持嵌套的聚合表達(dá)式(sum (sum (max x)))。
- 條件表達(dá)式
簡(jiǎn)單模式 ?(case value val1 then1?[val2 then2] ...?[elseVal])eg:(case subject_id (int 2) (int 1)) ->? case subject_id when 2 then 1 end)
查找模式 ?(case when1 then1 [when2 then2] ... [elseVal])eg:(case (= subject_id (string goods_base)) (int 2) (int 1)) ->? case when subject_id = 'goods_base' then 2 else 1 end
- 類型標(biāo)識(shí)表達(dá)式
- 類型轉(zhuǎn)換表達(dá)式
- 聚合通用表達(dá)式
- 非聚合通用表達(dá)式
例如:(func_none json_extract_scalar 40079 '$.m_name' )
Lisp 語(yǔ)法的解析
Lisp 的解析和翻譯是基于 antlr4 來(lái)實(shí)現(xiàn)的,處理流程如下:圖 7. Lisp 處理流程圖
- 將 Lisp(count x y)表達(dá)式通過(guò) antlr 翻譯成語(yǔ)法樹,如下圖所示:
圖 8. 語(yǔ)法樹
- 通過(guò)自定義的 Listener 遍歷語(yǔ)法樹
- 在遍歷語(yǔ)法樹的過(guò)程中,結(jié)合指標(biāo)的 query engine(presto/spark/clickhouse/mysql)元數(shù)據(jù)生成對(duì)應(yīng)的查詢引擎的 SQL 代碼(approx_distinct(x,e) over (partition by y))
任務(wù)調(diào)度
基于 Netty 庫(kù)收發(fā)集群消息,系統(tǒng)僅僅使用同一個(gè)線程池對(duì)象 EventLoopGroup 來(lái)收發(fā)消息,而用戶的業(yè)務(wù)邏輯,則交由一個(gè)單獨(dú)的線程池。選用 Netty 的另外一個(gè)原因是“零拷貝”的能力,在大量數(shù)據(jù)返回時(shí),通過(guò)文件的形式直接將結(jié)果送給調(diào)用者。
多隊(duì)列 多用戶調(diào)度
業(yè)務(wù)需求通常包含時(shí)間敏感與不敏感作業(yè),為了提高作業(yè)的穩(wěn)定性和系統(tǒng)的可配置性,Hera 提供了多隊(duì)列作業(yè)調(diào)度的功能。用戶在提交作業(yè)時(shí)可以顯式地指定一個(gè)作業(yè)隊(duì)列名,當(dāng)這個(gè)作業(yè)在提交到集群時(shí),如果相應(yīng)的隊(duì)列有空閑,則就會(huì)被添加進(jìn)相應(yīng)的隊(duì)列中,否則返回具體的錯(cuò)誤給客戶端,如任務(wù)隊(duì)列滿、隊(duì)列名不存在、隊(duì)列已經(jīng)關(guān)閉等,客戶端可以選擇“是否重試提交”。
當(dāng)一個(gè)作業(yè)被添加進(jìn)隊(duì)列之后,Master 就會(huì)立即嘗試調(diào)度這個(gè)隊(duì)列中的作業(yè),基于以下條件選擇合適的作業(yè)運(yùn)行:
- 每個(gè)隊(duì)列都有自己的權(quán)重,同時(shí)會(huì)設(shè)置占用整個(gè)集群的資源總量,如最多使用多少內(nèi)存、最多運(yùn)行的任務(wù)數(shù)量等。
- 隊(duì)列中的任務(wù)也有自己的權(quán)重,同時(shí)會(huì)記錄這個(gè)作業(yè)入隊(duì)的時(shí)間,在排序當(dāng)前隊(duì)列的作業(yè)時(shí),利用入隊(duì)的時(shí)間偏移量和總的超時(shí)時(shí)間,計(jì)算得到一個(gè)最終的評(píng)分。
- 除了調(diào)度系統(tǒng)本身的調(diào)度策略外,還需要考慮外部計(jì)算集群的負(fù)載,在從某個(gè)隊(duì)列中拿出一個(gè)作業(yè)后,再進(jìn)行一次過(guò)濾,或者是先過(guò)濾,再進(jìn)行作業(yè)的評(píng)分計(jì)算。
一個(gè)可用的計(jì)算作業(yè)評(píng)分模型如下:
隊(duì)列動(dòng)態(tài)因子 = 隊(duì)列大小 / 隊(duì)列容量 * (1 - 作業(yè)運(yùn)行數(shù) / 隊(duì)列并行度)
這個(gè)等式表示的意義是:如果某個(gè)隊(duì)列正在等待的作業(yè)的占比比較大,同時(shí)并行運(yùn)行的作業(yè)數(shù)占比也比較大時(shí),這個(gè)隊(duì)列的作業(yè)就擁有一個(gè)更大的因子,也就意味著在隊(duì)列權(quán)重相同時(shí),這個(gè)隊(duì)列中的作業(yè)應(yīng)該被優(yōu)先調(diào)度。
作業(yè)權(quán)重 = 1 - (當(dāng)前時(shí)間-入隊(duì)時(shí)間) / 超時(shí)時(shí)間
這個(gè)等式表示的意義是:在同一個(gè)隊(duì)列中,如果一個(gè)作業(yè)的剩余超時(shí)時(shí)間越少,則意味著此作業(yè)將更快達(dá)到超時(shí),因此它應(yīng)該獲得更大的選擇機(jī)會(huì)。
score = 作業(yè)權(quán)重 隊(duì)列動(dòng)態(tài)因子 隊(duì)列權(quán)重
這個(gè)等式表示的意義是:對(duì)于所有的隊(duì)列中的所有任務(wù),首先決定一個(gè)作業(yè)是否優(yōu)先被調(diào)度的因子是設(shè)置的隊(duì)列權(quán)重,例如權(quán)重為 10 的隊(duì)列的作業(yè),應(yīng)該比權(quán)重為 1 的隊(duì)列中的作業(yè)被優(yōu)先調(diào)度,而不管作業(yè)本身的權(quán)重(是否會(huì)有很大的機(jī)率超時(shí));其次影響作業(yè)調(diào)度優(yōu)先級(jí)的因子是隊(duì)列動(dòng)態(tài)因子,例如有兩個(gè)相同權(quán)重的隊(duì)列時(shí),如果一個(gè)隊(duì)列的動(dòng)態(tài)因子為 0.5,另外一個(gè)隊(duì)列的動(dòng)態(tài)因子是 0.3,那么應(yīng)該優(yōu)先選擇動(dòng)態(tài)因子為 0.5 的隊(duì)列作業(yè)進(jìn)行調(diào)度,而不管作業(yè)本身的權(quán)重;最后影響作業(yè)調(diào)度優(yōu)先級(jí)的因子是作業(yè)權(quán)重,例如在同一個(gè)隊(duì)列中,有兩個(gè)權(quán)重分別為 0.2 和 0.5 的作業(yè),那么為了避免更多的作業(yè)超時(shí),權(quán)重為 0.2 的作業(yè)應(yīng)該被優(yōu)先調(diào)度。
簡(jiǎn)單描述作業(yè)的排序過(guò)程就是,首先按隊(duì)列權(quán)重排序所有的隊(duì)列;對(duì)于有重復(fù)的隊(duì)列,則會(huì)計(jì)算每個(gè)隊(duì)列的動(dòng)態(tài)因子,并按此因子排序;對(duì)于每一個(gè)隊(duì)列,作業(yè)的排序規(guī)則按作業(yè)的超時(shí)比率來(lái)排序;最終依次按序遍歷每一個(gè)隊(duì)列,嘗試從中選擇足夠多的作業(yè)運(yùn)行,直到作業(yè)都被運(yùn)行或是達(dá)到集群限制條件。這里說(shuō)足夠多,是指每一個(gè)隊(duì)列都會(huì)有一個(gè)最大的并行度和最大資源占比,這兩個(gè)限制隊(duì)列的參數(shù)組合,是為了避免因某一個(gè)隊(duì)列的容量和并行度被設(shè)置的過(guò)大,可能超過(guò)了整個(gè)集群,導(dǎo)致其它隊(duì)列被“餓死”的情況。
SQL 作業(yè)流程
用戶通過(guò) Client 提交原始 SQL,這里以 Presto SQL 為例,Client 在提交作業(yè)時(shí),指定了 SQL 路由,則會(huì)首先通過(guò)訪問(wèn) SQLParser 服務(wù),在發(fā)送給 Master 之前,會(huì)首先提交 SQL 語(yǔ)句到 SQLParser 服務(wù)器,將 SQL 解析成后端計(jì)算集群可以支持的 SQL 語(yǔ)句,如 Spark、Presto、ClickHouse 等,為了能夠減少 RPC 交互次數(shù),SQLParser 會(huì)一次返回所有可能被改寫的 SQL 語(yǔ)句。在接收到 SQLParser 服務(wù)返回的多個(gè)可能 SQL 語(yǔ)句后,就會(huì)填充當(dāng)前的作業(yè)對(duì)象,真正開始向 Master 提交運(yùn)行。
Master 在收到用戶提交的作業(yè)后,會(huì)根據(jù)一定的調(diào)度策略,最終將任務(wù)分發(fā)到合適的 Worker 上,開始執(zhí)行。Worker 會(huì)首先采用 SQL 作業(yè)默認(rèn)的執(zhí)行引擎,比如 Presto,提交到對(duì)應(yīng)的計(jì)算集群運(yùn)行,但如果因?yàn)槟撤N原因不能得到結(jié)果,則會(huì)嘗試使用其它的計(jì)算引擎進(jìn)行計(jì)算。當(dāng)然這里也可以同時(shí)向多個(gè)計(jì)算集群提交作業(yè),一旦某個(gè)集群首先返回結(jié)果時(shí),就取消所有其它的作業(yè),不過(guò)這需要其它計(jì)算集群的入口能夠支持取消操作。
當(dāng) SQL 作業(yè)完成后,將結(jié)果返回到 Worker 端,為了能夠更加高效地將查詢結(jié)果返回給 Client 端,Worker 會(huì)從 Master 發(fā)送的任務(wù)對(duì)象中提取 Client 側(cè)信息,并將結(jié)果直接發(fā)送給 Client,直到收到確認(rèn)信息,至此整個(gè)任務(wù)才算執(zhí)行完畢。
在整個(gè)作業(yè)的流轉(zhuǎn)過(guò)程中,會(huì)以任務(wù)的概念在調(diào)度系統(tǒng)中進(jìn)行傳播,并經(jīng)歷幾個(gè)狀態(tài)的更新,分別標(biāo)識(shí) new、waiting、running、succeed、failed 階段。
圖 9. SQL 作業(yè)處理流程
Metrics 采集
數(shù)據(jù)服務(wù)搜集兩類 metrics,一類靜態(tài)的,用于描述 master/worker/client 的基本信息;一類是動(dòng)態(tài)的,描述 master/worker 的運(yùn)行時(shí)信息。這里主要說(shuō)明一下有關(guān)集群動(dòng)態(tài)信息的采集過(guò)程及作用。以 worker 為例,當(dāng) worker 成功注冊(cè)到 master 時(shí),就會(huì)開啟定時(shí)心跳匯報(bào)動(dòng)作,并借道心跳請(qǐng)求,將自己的運(yùn)行時(shí)信息匯報(bào)給 master。這里主要是內(nèi)存使用情況,例如當(dāng)前 worker 通過(guò)估算方法,統(tǒng)計(jì)目前運(yùn)行的任務(wù)占據(jù)了多少內(nèi)存,以便 master 能夠在后續(xù)的任務(wù)分發(fā)過(guò)程中,能夠根據(jù)內(nèi)存信息進(jìn)行決策。master 會(huì)統(tǒng)計(jì)它所管理的集群整個(gè)情況,例如每個(gè)任務(wù)隊(duì)列的快照信息、worker 的快照信息、集群的運(yùn)行時(shí)配置信息等,并通過(guò)參數(shù)控制是否打印這些信息,以便調(diào)試。調(diào)用情況
目前數(shù)據(jù)服務(wù)每天調(diào)用量:toC: 9000W /每天。toB:150W /每天(透?jìng)鞯綀?zhí)行 engine 端調(diào)用量)。- ETL 任務(wù)執(zhí)行時(shí)間基本在 3 分鐘左右完成;
- adhoc 查詢目前主要有 Spark Thrift Server,Presto,Clickhouse 3 種引擎,大部分 SQL 90% 2s 左右完成,Clickhouse 查詢 99%在 1s 左右完成,Presto 調(diào)用量 50W /日, Clickhouse 調(diào)用量 44W /日。
解決的性能問(wèn)題
數(shù)據(jù)服務(wù)主要解決 SLA 方面的問(wèn)題。如人群計(jì)算、數(shù)據(jù)無(wú)縫遷移、數(shù)據(jù)產(chǎn)品 SLA 等,這里用人群舉例說(shuō)明如下:人群計(jì)算遇到的問(wèn)題:
- 人群計(jì)算任務(wù)的數(shù)據(jù)本地性不好;
- HDFS 存在數(shù)據(jù)熱點(diǎn)問(wèn)題;
- HDFS 讀寫本身存在長(zhǎng)尾現(xiàn)象。
數(shù)據(jù)服務(wù)改造新的架構(gòu)方案:
- 計(jì)算與存儲(chǔ)同置,這樣數(shù)據(jù)就不需通過(guò)網(wǎng)絡(luò)反復(fù)讀取,造成網(wǎng)絡(luò)流量浪費(fèi)。
- 減少 HDFS 讀寫長(zhǎng)尾對(duì)人群計(jì)算造成的額外影響,同時(shí)減少人群計(jì)算對(duì)于 HDFS 穩(wěn)定性的影響。
- 廣告人群計(jì)算介于線上生產(chǎn)任務(wù)跟離線任務(wù)之間的任務(wù)類型。這里我們希望能保證這類應(yīng)用的可靠性和穩(wěn)定性,從而更好地為公司業(yè)務(wù)賦能
- 通過(guò)數(shù)據(jù)服務(wù)執(zhí)行人群計(jì)算。
圖 10. Alluxio 和 Spark 集群混部
基于 Alluxio 的緩存表同步
將 Hive 表的 location 從 HDFS 路徑替換為 Alluxio 路徑,即表示該表的數(shù)據(jù)存儲(chǔ)于 Alluxio 中。我們使用的方案不是直接寫通過(guò) ETL 任務(wù)寫 Alluxio 表的數(shù)據(jù),而是由 Alluxio 主動(dòng)去拉取同樣 Hive 表結(jié)構(gòu)的 HDFS 中的數(shù)據(jù),即我們創(chuàng)建了一個(gè) HDFS 表的 Alluxio 緩存表?;?HDFS 的人群計(jì)算底表的表結(jié)構(gòu)如下:
CREATE TABLE `hdfs.ads_tags_table`(
`oaid_md5` string,
`mid` string,
`user_id` bigint,
.........
)
PARTITIONED BY (
`dt` string)
LOCATION
'hdfs://xxxx/hdfs.db/ads_tags_table'
基于 Alluxio 的人群計(jì)算底表的表結(jié)構(gòu)如下:
CREATE TABLE `alluxio.ads_tags_table`(
`oaid_md5` string,
`mid` string,
`user_id` bigint,
.........
)
PARTITIONED BY (
`dt` string COMMENT '????')
LOCATION
'alluxio://zk@IP1:2181,IP2:2181/alluxio.db/ads_tags_table'
?兩個(gè)表結(jié)構(gòu)的字段和分區(qū)定義完全相同。只有兩處不同點(diǎn):通過(guò)不同的庫(kù)名區(qū)分了是 HDFS 的表還是 Alluxio 的表;location 具體確認(rèn)了數(shù)據(jù)存儲(chǔ)的路徑是 HDFS 還是 Alluxio。
由于 Alluxio 不能感知到分區(qū)表的變化,我們開發(fā)了一個(gè)定時(shí)任務(wù)去自動(dòng)感知源表的分區(qū)變化,使得 Hive 表的數(shù)據(jù)能夠同步到 Alluxio 中。
具體步驟如下:
- 定時(shí)任務(wù)發(fā)起輪詢,檢測(cè)源表是否有新增分區(qū)。
- 發(fā)起一個(gè) SYN2ALLUXIO 的任務(wù)由數(shù)據(jù)服務(wù)執(zhí)行。
- 任務(wù)執(zhí)行腳本為將 Alluxio 表添加與 HDFS 表相同的分區(qū)。
- 分區(qū)添加完成之后,Alluxio 會(huì)自動(dòng)從 mount 的 HDFS 路徑完成數(shù)據(jù)同步。
圖 11. Alluxio 緩存表同步
人群計(jì)算任務(wù)
上小節(jié)介紹了如何讓 Alluxio 和 HDFS 的 Hive 表保持?jǐn)?shù)據(jù)同步,接下來(lái)需要做的就是讓任務(wù)計(jì)算的 Spark 任務(wù)跑在 Spark 與 Alluxio 混部的集群上,充分利用數(shù)據(jù)的本地性以及計(jì)算資源的隔離性,提高人群計(jì)算效率。人群標(biāo)簽計(jì)算的 SQL 樣例如下:
INSERT INTO hive_advert.cd0000127760_full
SELECT result_id, '20210703'
FROM
(SELECT oaid_md5 AS result_id
FROM hdfs.ads_tags_table AS ta
WHERE ta.dt = '20210702' and xxxxxxx) AS t
上面是一個(gè) Spark SQL 的 ETL,此處的 hdfs.ads_tags_table 即為人群計(jì)算依賴的底表,此表為一個(gè) HDFS location 的表。
人群服務(wù)通過(guò)調(diào)用數(shù)據(jù)服務(wù)執(zhí)行。數(shù)據(jù)服務(wù)根據(jù)底表分區(qū)是否同步到 Alluxio 決定是否需要下推是用 Alluxio 表來(lái)完成計(jì)算。如果底表數(shù)據(jù)已經(jīng)同步到 Alluxio,則使用 Alluxio 表來(lái)做為底表計(jì)算人群。
下推邏輯是用 Alluxio 的表名替換原表,假設(shè)此處緩存的 Alluxio 表名為 alluxio.ads_tags_table,那么原 SQL 就會(huì)被改寫成如下:
INSERT INTO hive_advert.cd0000127760_full
SELECT result_id, '20210703'
FROM
(SELECT oaid_md5 AS result_id
FROM alluxio.ads_tags_table AS ta
WHERE ta.dt = '20210702' and xxxxxxx) AS t
依靠數(shù)據(jù)服務(wù)調(diào)度系統(tǒng),通過(guò)用戶 SQL 改寫以及 Alluxio 和 Spark 計(jì)算結(jié)點(diǎn)混部模式,人群計(jì)算任務(wù)提速了 10%~30%小結(jié)
雖然截至今天,Hera 數(shù)據(jù)服務(wù)已經(jīng)支持了很多生產(chǎn)業(yè)務(wù),但目前仍有很多需要完善的地方:- 不同 engine 存在同一個(gè)含義函數(shù)寫法不一致的情況。這種情況在 Presto 跟 ClickHouse 的函數(shù)比較時(shí)尤為突出,如 Presto 的 strpos(string,substring)函數(shù),在 Clickhouse 中為 position(haystack, needle[, start_pos]),且這些函數(shù)的參數(shù)順序存在不一致的情況,如何更優(yōu)雅地支持不同 engine 的差異情況還需要進(jìn)一步思考。
- 人群計(jì)算采用業(yè)界通用的 ClickHouse BitMap 解決方案落地,提升人群的計(jì)算效率同時(shí)擴(kuò)展數(shù)據(jù)服務(wù)的業(yè)務(wù)邊界。
- 數(shù)據(jù)服務(wù)支持調(diào)度的 HA 和災(zāi)備完善,更好地在 K8s 上進(jìn)行部署。