數(shù)倉(cāng)架構(gòu) 數(shù)據(jù)倉(cāng)庫(kù)可以分為三層:ODS(原始數(shù)據(jù)層)、DW(數(shù)據(jù)倉(cāng)庫(kù)層)、ADS(應(yīng)用數(shù)據(jù)層),。 1. ODS (Operation Data Store) 層 從日志或者業(yè)務(wù)DB傳輸過(guò)來(lái)的原始數(shù)據(jù),,傳統(tǒng)的離線數(shù)倉(cāng)做法也有直接用CDC (Change Data Capture) 工具周期同步到數(shù)倉(cāng)里面。用一套統(tǒng)一的Kafka來(lái)承接這個(gè)角色,,可以讓數(shù)據(jù)更實(shí)時(shí)的落入數(shù)倉(cāng),,也可以在這一層統(tǒng)一實(shí)時(shí)和離線的。 2. DW (Data warehouse) 層 DW層一般也分為DWD層和DWS層:
3. ADS (Application Data Store) 層 和DWS不同的是,,這一層直接面向用戶(hù)的數(shù)據(jù)服務(wù),,不需要再次計(jì)算,已經(jīng)是最終需要的數(shù)據(jù),。 主要分為兩條鏈路:
主流的數(shù)倉(cāng)架構(gòu)仍然是Lambda架構(gòu),,Lambda架構(gòu)雖然復(fù)雜,但是它能覆蓋業(yè)務(wù)上需要的場(chǎng)景,,對(duì)業(yè)務(wù)來(lái)說(shuō),是最靈活的方式,。 Lambda架構(gòu)分為兩條鏈路:
上圖標(biāo)出了1-9條邊,,每條邊代表數(shù)據(jù)的轉(zhuǎn)換,,就是大數(shù)據(jù)的計(jì)算,本文后續(xù)將分析這些邊,,探索Flink在其中可以發(fā)揮的作用,。 Flink一棧式計(jì)算 元數(shù)據(jù) 先說(shuō)下元數(shù)據(jù)的管理,離線數(shù)倉(cāng)有Hive metastore來(lái)管理元數(shù)據(jù),,但是單純的Kafka不具備元數(shù)據(jù)管理的能力,,這里推薦兩種做法: 1. Confluent schema registry 搭建起schema registry服務(wù)后,通過(guò)confluent的url即可獲取到表的schema信息,,對(duì)于上百個(gè)字段的表,,它可以省編寫(xiě)Flink作業(yè)時(shí)的很多事,后續(xù)Flink也正在把它的schema推斷功能結(jié)合Confluent schema registry,。但是它仍然省不掉創(chuàng)建表的過(guò)程,,用戶(hù)也需要填寫(xiě)Confluent對(duì)應(yīng)的URL。 2. Catalog 目前Flink內(nèi)置已提供了HiveCatalog,,Kafka的表可以直接集成到Hive metastore中,,用戶(hù)在SQL中可以直接使用這些表。但是Kafka的start-offset一些場(chǎng)景需要靈活的配置,,為此,,F(xiàn)link也正在提供 LIKE [1] 和 Table Hints [2] 等手段來(lái)解決。 Flink中離線數(shù)倉(cāng)和實(shí)時(shí)數(shù)倉(cāng)都使用Hive Catalog: use catalog my_hive; -- build streaming database and tables; create database stream_db; use stream_db; create table order_table ( id long, amount double, user_id long, status string, ts timestamp, … -- 可能還有幾十個(gè)字段 ts_day string, ts_hour string ) with ( ‘connector.type’ = ‘kafka’, … -- Kafka table相關(guān)配置 ); -- build batch database and tables; create database batch_db; use batch_db; create table order_table like stream_db.order_table (excluding options) partitioned by (ts_day, ts_hour) with ( ‘connector.type’ = ‘hive’, … -- Hive table相關(guān)配置 );
數(shù)倉(cāng)導(dǎo)入 計(jì)算①和⑤分別是實(shí)時(shí)數(shù)倉(cāng)的導(dǎo)入和離線數(shù)倉(cāng)的導(dǎo)入,,近來(lái),,更加實(shí)時(shí)的離線數(shù)倉(cāng)導(dǎo)入越來(lái)越成為數(shù)據(jù)倉(cāng)庫(kù)的常規(guī)做法,F(xiàn)link的導(dǎo)入可以讓離線數(shù)倉(cāng)的數(shù)據(jù)更實(shí)時(shí)化,。 以前主要通過(guò)DataStream StreamingFileSink的方式進(jìn)行導(dǎo)入,,但是不支持ORC和無(wú)法更新HMS。 Flink streaming integrate Hive后,,提供Hive的streaming sink [3],,用SQL的方式會(huì)更方便靈活,使用SQL的內(nèi)置函數(shù)和UDF,,而且流和批可以復(fù)用,,運(yùn)行兩個(gè)流計(jì)算作業(yè),。
計(jì)算②和⑥分別是實(shí)時(shí)數(shù)倉(cāng)和離線數(shù)倉(cāng)的中間數(shù)據(jù)處理,這里面主要有三種計(jì)算:
維表Join 與離線計(jì)算不同,,離線計(jì)算只用關(guān)心某個(gè)時(shí)間點(diǎn)的維表數(shù)據(jù),,而Streaming的作業(yè)持續(xù)運(yùn)行,所以它關(guān)注的不能只是靜態(tài)數(shù)據(jù),,需要是動(dòng)態(tài)的維表,。 另外為了Join的效率,streaming作業(yè)往往是join一個(gè)數(shù)據(jù)庫(kù)表,,而不僅僅是Hive表,。 例子: -- stream 維表 use stream_db; create table user_info ( user_id long, age int, address, primary key(user_id) ) with ( ‘connector.type’ = ‘jdbc’, ... ); -- 將離線數(shù)倉(cāng)的維表導(dǎo)入實(shí)時(shí)數(shù)倉(cāng)中 insert into user_info select * from batch_db.user_info; -- 維表Join,SQL批流復(fù)用 insert into order_with_user_age select * from order_table join user_info for system_time as of order_table.proctime on user_info.user_id = user_info.user_id; 這里有個(gè)非常麻煩的事情,,那就是在實(shí)時(shí)數(shù)倉(cāng)中,,需要按時(shí)周期調(diào)度更新維表到實(shí)時(shí)維表數(shù)據(jù)庫(kù)中,那能不能直接Join離線數(shù)倉(cāng)的Hive維表呢,?目前社區(qū)也正在開(kāi)發(fā)Hive維表,,它有哪些挑戰(zhàn): Hive維表太大,放不進(jìn)Cache中:
維表更新問(wèn)題:
有狀態(tài)計(jì)算和數(shù)據(jù)導(dǎo)出 例子:
一句簡(jiǎn)單的聚合SQL,,它在批計(jì)算和流計(jì)算的執(zhí)行模式是完全不同的,。 Streaming的聚合和離線計(jì)算的聚合最大的不同在于它是一個(gè)動(dòng)態(tài)表[4],它的輸出是在持續(xù)變化的,。動(dòng)態(tài)表的概念簡(jiǎn)單來(lái)說(shuō),一個(gè)streaming的count,,它的輸出是由輸入來(lái)驅(qū)動(dòng)的,,而不是像batch一樣,獲取全部輸入后才會(huì)輸出,,所以,,它的結(jié)果是動(dòng)態(tài)變化的:
有狀態(tài)計(jì)算后的輸出:
例子: -- batch:計(jì)算完成后,,一次性輸出到mysql中,同key只有一個(gè)數(shù)據(jù) -- streaming:mysql里面的數(shù)據(jù)不斷更新,,不斷變化 insert into mysql_table select age, avg(amount) from order_with_user_age group by age; -- batch: 同key只有一個(gè)數(shù)據(jù),,append即可 insert into hive_table select age, avg(amount) from order_with_user_age group by age; -- streaming: kafka里面的數(shù)據(jù)不斷append,并且多出一列,,來(lái)表示這是upsert的消息,,后續(xù)的Flink消費(fèi)會(huì)自動(dòng)做出機(jī)制來(lái)處理upsert insert into kafka_table select age, avg(amount) from order_with_user_age group by age;
AD-HOC與OLAP 離線數(shù)倉(cāng)可以進(jìn)行計(jì)算⑨,對(duì)明細(xì)數(shù)據(jù)或者匯總數(shù)據(jù)都可以進(jìn)行ad-hoc的查詢(xún),,可以讓數(shù)據(jù)分析師進(jìn)行靈活的查詢(xún),。 目前實(shí)時(shí)數(shù)倉(cāng)一個(gè)比較大的缺點(diǎn)是不能Ad-hoc查詢(xún),因?yàn)樗旧頉](méi)有保存歷史數(shù)據(jù),,Kafka可能可以保存3天以上的數(shù)據(jù),,但是一是存儲(chǔ)成本高、二是查詢(xún)效率也不好,。 一個(gè)思路是提供OLAP數(shù)據(jù)庫(kù)的批流統(tǒng)一Sink組件:
總結(jié) 本文從目前的Lambda架構(gòu)出發(fā),,分析了Flink一棧式數(shù)倉(cāng)計(jì)算方案的能力,本文中一些Flink新功能還在快速迭代演進(jìn)中,,隨著不斷的探索和實(shí)踐,,希望朝著計(jì)算一體化的方向逐漸推進(jìn),將來(lái)的數(shù)倉(cāng)架構(gòu)希望能真正統(tǒng)一用戶(hù)的離線和實(shí)時(shí),,提供統(tǒng)一的體驗(yàn):
[1]https://cwiki./confluence/display/FLINK/FLIP-110%3A Support LIKE clause in CREATE TABLE [2]https://cwiki./confluence/display/FLINK/FLIP-113%3A Supports Table Hints [3]https://cwiki./confluence/display/FLINK/FLIP-115%3A Filesystem connector in Table [4]https://ci./projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html [5]https://cwiki./confluence/display/FLINK/FLIP-105%3A Support to Interpret and Emit Changelog in Flink SQL |
|