久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

一套 SQL 搞定數(shù)據(jù)倉(cāng)庫(kù)?Flink有了新嘗試

 liang1234_ 2020-03-19

阿里妹導(dǎo)讀:數(shù)據(jù)倉(cāng)庫(kù)是公司數(shù)據(jù)發(fā)展到一定規(guī)模后必然需要提供的一種基礎(chǔ)服務(wù),,也是“數(shù)據(jù)智能”建設(shè)的基礎(chǔ)環(huán)節(jié),。迅速獲取數(shù)據(jù)反饋不僅有利于改善產(chǎn)品及用戶(hù)體驗(yàn),更有利于公司的科學(xué)決策,,因此獲取數(shù)據(jù)的實(shí)時(shí)性尤為重要,。
目前企業(yè)的數(shù)倉(cāng)建設(shè)大多是離線一套,實(shí)時(shí)一套,。業(yè)務(wù)要求低延時(shí)的使用實(shí)時(shí)數(shù)倉(cāng),;業(yè)務(wù)復(fù)雜的使用離線數(shù)倉(cāng)。架構(gòu)十分復(fù)雜,,需要使用很多系統(tǒng)和計(jì)算框架,,這就要求企業(yè)儲(chǔ)備多方面的人才,導(dǎo)致人才成本較高,,且出了問(wèn)題難以排查,,終端用戶(hù)也需要熟悉多種語(yǔ)法。本文分析目前的數(shù)倉(cāng)架構(gòu),,探索離線和實(shí)時(shí)數(shù)倉(cāng)是否能放在一起考慮,,探索Flink的統(tǒng)一架構(gòu)是否能解決大部分問(wèn)題。

文末有福利,,可下載電子書(shū),。

數(shù)倉(cāng)架構(gòu)

數(shù)據(jù)倉(cāng)庫(kù)可以分為三層:ODS(原始數(shù)據(jù)層)、DW(數(shù)據(jù)倉(cāng)庫(kù)層)、ADS(應(yīng)用數(shù)據(jù)層),。

1. ODS (Operation Data Store) 層

從日志或者業(yè)務(wù)DB傳輸過(guò)來(lái)的原始數(shù)據(jù),,傳統(tǒng)的離線數(shù)倉(cāng)做法也有直接用CDC (Change Data Capture) 工具周期同步到數(shù)倉(cāng)里面。用一套統(tǒng)一的Kafka來(lái)承接這個(gè)角色,,可以讓數(shù)據(jù)更實(shí)時(shí)的落入數(shù)倉(cāng),,也可以在這一層統(tǒng)一實(shí)時(shí)和離線的。

2. DW (Data warehouse) 層

DW層一般也分為DWD層和DWS層:

  • DWD (Data warehouse detail) 層:明細(xì)數(shù)據(jù)層,,這一層的數(shù)據(jù)應(yīng)該是經(jīng)過(guò)清洗的,,干凈的、準(zhǔn)確的數(shù)據(jù),,它包含的信息和ODS層相同,,但是它遵循數(shù)倉(cāng)和數(shù)據(jù)庫(kù)的標(biāo)準(zhǔn)Schema定義。

  • DWS (Data warehouse service) 層:匯總數(shù)據(jù)層,,這一層可能經(jīng)過(guò)了輕度的聚合,,可能是星型或雪花模型的結(jié)構(gòu)數(shù)據(jù),這一層已經(jīng)做了一些業(yè)務(wù)層的計(jì)算,,用戶(hù)可以基于這一層,,計(jì)算出數(shù)據(jù)服務(wù)所需數(shù)據(jù)。

3. ADS (Application Data Store) 層

和DWS不同的是,,這一層直接面向用戶(hù)的數(shù)據(jù)服務(wù),,不需要再次計(jì)算,已經(jīng)是最終需要的數(shù)據(jù),。

主要分為兩條鏈路:

  1. 業(yè)務(wù)DB和日志 -> Kafka -> 實(shí)時(shí)數(shù)倉(cāng) (Kafka Dim維表) -> BI DB -> 數(shù)據(jù)服務(wù)

  2. 業(yè)務(wù)DB和日志 -> Kafka -> 離線數(shù)倉(cāng) (Hive metastore HDFS) -> BI DB -> 數(shù)據(jù)服務(wù)

主流的數(shù)倉(cāng)架構(gòu)仍然是Lambda架構(gòu),,Lambda架構(gòu)雖然復(fù)雜,但是它能覆蓋業(yè)務(wù)上需要的場(chǎng)景,,對(duì)業(yè)務(wù)來(lái)說(shuō),是最靈活的方式,。

Lambda架構(gòu)分為兩條鏈路:

  • 傳統(tǒng)離線數(shù)據(jù)具有穩(wěn)定,、計(jì)算復(fù)雜、靈活的優(yōu)點(diǎn),,運(yùn)行批計(jì)算,,保證T 1的報(bào)表產(chǎn)生和靈活的Ad-hoc查詢(xún)。

  • 實(shí)時(shí)數(shù)倉(cāng)提供低延時(shí)的數(shù)據(jù)服務(wù),,傳統(tǒng)的離線數(shù)倉(cāng)往往都是T 1的延時(shí),,這導(dǎo)致分析人員沒(méi)法做一些實(shí)時(shí)化的決策,而實(shí)時(shí)數(shù)倉(cāng)整條鏈路的延遲最低甚至可以做到秒級(jí),,這不但加快了分析和決策,,而且也給更多的業(yè)務(wù)帶來(lái)了可能,比如實(shí)時(shí)化的監(jiān)控報(bào)警。Flink的強(qiáng)項(xiàng)是實(shí)時(shí)計(jì)算,、流計(jì)算,,而Kafka是實(shí)時(shí)數(shù)倉(cāng)存儲(chǔ)的核心。

上圖標(biāo)出了1-9條邊,,每條邊代表數(shù)據(jù)的轉(zhuǎn)換,,就是大數(shù)據(jù)的計(jì)算,本文后續(xù)將分析這些邊,,探索Flink在其中可以發(fā)揮的作用,。

Flink一棧式計(jì)算

元數(shù)據(jù)

先說(shuō)下元數(shù)據(jù)的管理,離線數(shù)倉(cāng)有Hive metastore來(lái)管理元數(shù)據(jù),,但是單純的Kafka不具備元數(shù)據(jù)管理的能力,,這里推薦兩種做法:

1. Confluent schema registry

搭建起schema registry服務(wù)后,通過(guò)confluent的url即可獲取到表的schema信息,,對(duì)于上百個(gè)字段的表,,它可以省編寫(xiě)Flink作業(yè)時(shí)的很多事,后續(xù)Flink也正在把它的schema推斷功能結(jié)合Confluent schema registry,。但是它仍然省不掉創(chuàng)建表的過(guò)程,,用戶(hù)也需要填寫(xiě)Confluent對(duì)應(yīng)的URL。

2. Catalog

目前Flink內(nèi)置已提供了HiveCatalog,,Kafka的表可以直接集成到Hive metastore中,,用戶(hù)在SQL中可以直接使用這些表。但是Kafka的start-offset一些場(chǎng)景需要靈活的配置,,為此,,F(xiàn)link也正在提供 LIKE [1] 和 Table Hints [2] 等手段來(lái)解決。

Flink中離線數(shù)倉(cāng)和實(shí)時(shí)數(shù)倉(cāng)都使用Hive Catalog:

use catalog my_hive;-- build streaming database and tables;create database stream_db;use stream_db;create table order_table ( id long, amount double, user_id long, status string, ts timestamp, … -- 可能還有幾十個(gè)字段 ts_day string, ts_hour string) with ( ‘connector.type’ = ‘kafka’, … -- Kafka table相關(guān)配置);-- build batch database and tables;create database batch_db;use batch_db;create table order_table like stream_db.order_table (excluding options)partitioned by (ts_day, ts_hour)with ( ‘connector.type’ = ‘hive’, … -- Hive table相關(guān)配置);


使用Catalog,,后續(xù)的計(jì)算可以完全復(fù)用批和流,,提供相同的體驗(yàn)。 

數(shù)倉(cāng)導(dǎo)入

計(jì)算①和⑤分別是實(shí)時(shí)數(shù)倉(cāng)的導(dǎo)入和離線數(shù)倉(cāng)的導(dǎo)入,,近來(lái),,更加實(shí)時(shí)的離線數(shù)倉(cāng)導(dǎo)入越來(lái)越成為數(shù)據(jù)倉(cāng)庫(kù)的常規(guī)做法,F(xiàn)link的導(dǎo)入可以讓離線數(shù)倉(cāng)的數(shù)據(jù)更實(shí)時(shí)化,。

以前主要通過(guò)DataStream StreamingFileSink的方式進(jìn)行導(dǎo)入,,但是不支持ORC和無(wú)法更新HMS。

Flink streaming integrate Hive后,,提供Hive的streaming sink [3],,用SQL的方式會(huì)更方便靈活,使用SQL的內(nèi)置函數(shù)和UDF,,而且流和批可以復(fù)用,,運(yùn)行兩個(gè)流計(jì)算作業(yè),。

insert into [stream_db.|batch_db.]order_table select … from log_table;


數(shù)據(jù)處理 

計(jì)算②和⑥分別是實(shí)時(shí)數(shù)倉(cāng)和離線數(shù)倉(cāng)的中間數(shù)據(jù)處理,這里面主要有三種計(jì)算:

  1. ETL:和數(shù)據(jù)導(dǎo)入一樣,,批流沒(méi)有區(qū)別,。

  2. 維表Join:維表補(bǔ)字段是很常見(jiàn)的數(shù)倉(cāng)操作,離線數(shù)倉(cāng)中基本都是直接Join Hive表即可,,但是Streaming作業(yè)卻有些不同,,下文將詳細(xì)描述。

  3. Aggregation:Streaming作業(yè)在這些有狀態(tài)的計(jì)算中,,產(chǎn)生的不是一次確定的值,,而可能是不斷變化的值。

維表Join

與離線計(jì)算不同,,離線計(jì)算只用關(guān)心某個(gè)時(shí)間點(diǎn)的維表數(shù)據(jù),,而Streaming的作業(yè)持續(xù)運(yùn)行,所以它關(guān)注的不能只是靜態(tài)數(shù)據(jù),,需要是動(dòng)態(tài)的維表,。

另外為了Join的效率,streaming作業(yè)往往是join一個(gè)數(shù)據(jù)庫(kù)表,,而不僅僅是Hive表,。

例子:

-- stream 維表use stream_db;create table user_info ( user_id long, age int, address, primary key(user_id)) with ( ‘connector.type’ = ‘jdbc’, ...); -- 將離線數(shù)倉(cāng)的維表導(dǎo)入實(shí)時(shí)數(shù)倉(cāng)中insert into user_info select * from batch_db.user_info; -- 維表Join,SQL批流復(fù)用insert into order_with_user_age select * from order_table join user_info for system_time as of order_table.proctime on user_info.user_id = user_info.user_id;

這里有個(gè)非常麻煩的事情,,那就是在實(shí)時(shí)數(shù)倉(cāng)中,,需要按時(shí)周期調(diào)度更新維表到實(shí)時(shí)維表數(shù)據(jù)庫(kù)中,那能不能直接Join離線數(shù)倉(cāng)的Hive維表呢,?目前社區(qū)也正在開(kāi)發(fā)Hive維表,,它有哪些挑戰(zhàn): 

Hive維表太大,放不進(jìn)Cache中:

  • 考慮Shuffle by key,,分布式的維表Join,,減少單并發(fā)Cache的數(shù)據(jù)量

  • 考慮將維表數(shù)據(jù)放入State中

維表更新問(wèn)題:

  • 簡(jiǎn)單的方案是TTL過(guò)期

  • 復(fù)雜一些的方案是實(shí)現(xiàn)Hive streaming source,并結(jié)合Flink的watermark機(jī)制

有狀態(tài)計(jì)算和數(shù)據(jù)導(dǎo)出

例子:

select age, avg(amount) from order_with_user_age group by age;

一句簡(jiǎn)單的聚合SQL,,它在批計(jì)算和流計(jì)算的執(zhí)行模式是完全不同的,。 

Streaming的聚合和離線計(jì)算的聚合最大的不同在于它是一個(gè)動(dòng)態(tài)表[4],它的輸出是在持續(xù)變化的,。動(dòng)態(tài)表的概念簡(jiǎn)單來(lái)說(shuō),一個(gè)streaming的count,,它的輸出是由輸入來(lái)驅(qū)動(dòng)的,,而不是像batch一樣,獲取全部輸入后才會(huì)輸出,,所以,,它的結(jié)果是動(dòng)態(tài)變化的:

  • 如果在SQL內(nèi)部,,F(xiàn)link內(nèi)部的retract機(jī)制會(huì)保證SQL 的結(jié)果的與批一樣。

  • 如果是外部的存儲(chǔ),,這給sink帶來(lái)了挑戰(zhàn),。

有狀態(tài)計(jì)算后的輸出:

  • 如果sink是一個(gè)可更新的數(shù)據(jù)庫(kù),比如HBase/Redis/JDBC,,那這看起來(lái)不是問(wèn)題,,我們只需要不斷的去更新就好了。

  • 但是如果是不可更新的存儲(chǔ)呢,,我們沒(méi)有辦法去更新原本的數(shù)據(jù),。為此,F(xiàn)link提出了Changelog的支持[5],,想內(nèi)置支持這種sink,,輸出特定Schema的數(shù)據(jù),讓下游消費(fèi)者也能很好的work起來(lái),。

例子:

-- batch:計(jì)算完成后,,一次性輸出到mysql中,同key只有一個(gè)數(shù)據(jù)-- streaming:mysql里面的數(shù)據(jù)不斷更新,,不斷變化insert into mysql_table select age, avg(amount) from order_with_user_age group by age;-- batch: 同key只有一個(gè)數(shù)據(jù),,append即可insert into hive_table select age, avg(amount) from order_with_user_age group by age;-- streaming: kafka里面的數(shù)據(jù)不斷append,并且多出一列,,來(lái)表示這是upsert的消息,,后續(xù)的Flink消費(fèi)會(huì)自動(dòng)做出機(jī)制來(lái)處理upsertinsert into kafka_table select age, avg(amount) from order_with_user_age group by age;

AD-HOC與OLAP 

離線數(shù)倉(cāng)可以進(jìn)行計(jì)算⑨,對(duì)明細(xì)數(shù)據(jù)或者匯總數(shù)據(jù)都可以進(jìn)行ad-hoc的查詢(xún),,可以讓數(shù)據(jù)分析師進(jìn)行靈活的查詢(xún),。

目前實(shí)時(shí)數(shù)倉(cāng)一個(gè)比較大的缺點(diǎn)是不能Ad-hoc查詢(xún),因?yàn)樗旧頉](méi)有保存歷史數(shù)據(jù),,Kafka可能可以保存3天以上的數(shù)據(jù),,但是一是存儲(chǔ)成本高、二是查詢(xún)效率也不好,。

一個(gè)思路是提供OLAP數(shù)據(jù)庫(kù)的批流統(tǒng)一Sink組件:

  • Druid sink

  • Doris sink

  • Clickhouse sink

  • HBase/Phoenix sink

總結(jié)

本文從目前的Lambda架構(gòu)出發(fā),,分析了Flink一棧式數(shù)倉(cāng)計(jì)算方案的能力,本文中一些Flink新功能還在快速迭代演進(jìn)中,,隨著不斷的探索和實(shí)踐,,希望朝著計(jì)算一體化的方向逐漸推進(jìn),將來(lái)的數(shù)倉(cāng)架構(gòu)希望能真正統(tǒng)一用戶(hù)的離線和實(shí)時(shí),,提供統(tǒng)一的體驗(yàn):

  • 統(tǒng)一元數(shù)據(jù)

  • 統(tǒng)一SQL開(kāi)發(fā)

  • 統(tǒng)一數(shù)據(jù)導(dǎo)入與導(dǎo)出

  • 將來(lái)考慮統(tǒng)一存儲(chǔ)

參考

[1]https://cwiki./confluence/display/FLINK/FLIP-110%3A Support LIKE clause in CREATE TABLE

[2]https://cwiki./confluence/display/FLINK/FLIP-113%3A Supports Table Hints

[3]https://cwiki./confluence/display/FLINK/FLIP-115%3A Filesystem connector in Table

[4]https://ci./projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html

[5]https://cwiki./confluence/display/FLINK/FLIP-105%3A Support to Interpret and Emit Changelog in Flink SQL

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,,所有內(nèi)容均由用戶(hù)發(fā)布,不代表本站觀點(diǎn),。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式,、誘導(dǎo)購(gòu)買(mǎi)等信息,,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,,請(qǐng)點(diǎn)擊一鍵舉報(bào),。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶(hù) 評(píng)論公約

    類(lèi)似文章 更多