背景
經(jīng)營分析,、決策支持是現(xiàn)代企業(yè)的一個讓數(shù)據(jù)發(fā)揮有效價值的分析型系統(tǒng)。
在各個行業(yè)中隨處可見,,例如共享充電寶中,,協(xié)助銷售了解實時的設(shè)備租賃情況,銷售業(yè)績,。在電商中,,協(xié)助小二和商戶發(fā)掘目標用戶群體,。金融行業(yè)中,協(xié)助輸出國民的存款,、消費,、貸款的畫像。
PostgreSQL, Greenplum都是非常適合于經(jīng)營分析,、決策支持的數(shù)據(jù)庫,。因為它們具備了一些特性,適合實時的分析透視,。(流式計算,、合并寫入、閱后即焚,、GIN倒排索引,、varbit類型、列存儲,、BITMAP合并掃描,、HLL估值類型、采樣算法等等),。
我也寫過很多實際的應(yīng)用案例,,可以參考本文末尾。
經(jīng)營分析系統(tǒng)的需求大同小異,,在手機行業(yè)中,,以imei或imsi為KEY,每個手機根據(jù)它的用戶的行為,,生成一些屬性,,針對每個屬性,劃分出不同的標簽,,形成了手機用戶的畫像,。再針對畫像進行人群的圈選、透視,,協(xié)助分析,。
例如,基于PostgreSQL數(shù)組以及GIN索引的設(shè)計:
經(jīng)營分析設(shè)計示例
1,、目標設(shè)計
2,、表結(jié)構(gòu)設(shè)計
3、屬性表
4,、標簽表
5,、標簽表索引設(shè)計
6、打標簽(含新增、更新,、刪除標簽)測試
7,、圈選測試
8、透視測試
9,、決策設(shè)計示例
流式+函數(shù)式計算
結(jié)構(gòu)設(shè)計
1,、手機用戶屬性表
create table tbl1 (
imei text primary key, -- 手機唯一標識
v1 int, -- 年齡
v2 float8, -- 收入
v3 geometry, -- 住址經(jīng)緯
v4 geometry, -- 公司經(jīng)緯
v5 char(1), -- 性別
v6 timestamp, -- 最后活躍時間
v7 int2, -- 每日在線時長
v8 int2, -- 星座
v9 text, -- 其他標簽。,。,。。,。
......
);
2,、標簽元數(shù)據(jù)表
create table tbl2 (
tagid int primary key, -- 標簽名
desc text, -- 描述,例如性別,,年齡分段,,收入分段,區(qū)域等等,,作為一個標簽標識,。
);
3、標簽表
create table tbl3 (
imei text primary key, -- 手機唯一標識
tagids int[], -- 標簽數(shù)組
ins_tags int[], -- 合并操作需要的中間字段
del_tags int[] -- 合并操作需要的中間字段
);
create index idx_tbl3_tagids on tbl3 using gin (tagids gin__int_ops);
或
create index idx_tbl3_tagids on tbl3 using gist (tagids gist__intbig_ops);
或
create index idx_tbl3_tagids on tbl3 using gist (tagids gist__int_ops);
4,、標簽表與屬性表實際上可以合一,,在透視時,可以避免一次JOIN(降低透視的耗時),,但是會引入更新IO放大的問題,,因為屬性表可能是寬表,。
根據(jù)實際的性能情況來選擇是否合一,。
需求與SQL設(shè)計
1、圈人
select imei from tbl3 where tagids @> array[標簽1, 標簽2]; -- 查找包含標簽1,,標簽2的人群,。
select imei from tbl3 where tagids && array[標簽1, 標簽2]; -- 查找包含標簽1,標簽2中任意一個或多個的人群,。
select imei from tbl3 where tagids && array[標簽1, 標簽2] and tagid @> array[標簽3, 標簽4]; -- 查找包含標簽3,,標簽4。同時包含標簽1,,標簽2中任意一個或多個的人群,。
2、針對圈出人群的精準透視
select v8,count(*) from tbl1 where
imei = any (array(
select imei from tbl3 where tagids @> array[標簽1, 標簽2]
) )
group by v8;
3,、新增或追加標簽
使用intarray插件,,簡化數(shù)組交、并、差操作,。
create extension intarray;
insert into tbl3 (imei, tagids) values (?, ?[]) on conflict (imei) do update set tagids=tbl3.tagids|excluded.tagids;
4,、刪標簽
update tbl3 set tagids = tagids - ?[] where imei=?;
5、更新標簽
update tbl3 set tagids = ?[] where imei=?;
6,、批量并行新增,、追加、刪除,、更新標簽優(yōu)化
如果要一次性操作很多條記錄(例如1000萬條記錄),,并且有并行的貼標簽操作(同一條用戶被多個SQL更新)。需要注意兩個問題:
6.1 大事務(wù)導(dǎo)致膨脹的問題,,建議分段操作,。
6.2 行鎖沖突問題,建議新增(插入),,然后合并到標簽表,。
優(yōu)化方法,
實現(xiàn)標簽最終一致性,。
將直接增,、刪、改標簽表,,改成寫行為日志tag_log,,采用任務(wù)調(diào)度,批量合并到標簽表:
create table tag_log (
imei text, -- 手機唯一標識
action text, -- insert, delete 表示增加,、刪除標簽 (更新需求應(yīng)該沒有,,如有,直接到標簽表操作)
tagids int[], -- 標簽IDs
crt_time timestamp default clock_timestamp() -- 時間
);
create index idx_tag_log_1 on tag_log (crt_time);
-- 16個分區(qū)表
do language plpgsql $$
declare
begin
for i in 0..15 loop
execute format('create table tag_log%s (like tag_log including all) inherits(tag_log)', i);
end loop;
end;
$$;
串行任務(wù),,閱后即焚(假設(shè)-99999999是一個永遠不存在的TAGID)
-- CTE語法,,支持閱后即焚的批量合并方法
with tmp as (delete from tag_log where ctid = any ( array (
select ctid from tag_log order by crt_time limit 10000 -- 按時序,批量取1萬條
)) returning * )
, tmp1 as (select imei,
uniq(sort(array_agg(case when action='insert' then tagids else -99999999 end))) - (-99999999) AS ins_tags,
uniq(sort(array_agg(case when action='delete' then tagids else -99999999 end))) - (-99999999) AS del_tags
from (select imei, action, unnest(tagids) as tagids from tmp) t group by imei)
insert into tbl3 (imei, tagids, ins_tags, del_tags)
select imei, ins_tags-del_tags, ins_tags, del_tags from tmp1
on conflict (imei) do update set tagids=((tbl3.tagids | excluded.ins_tags) - excluded.del_tags), ins_tags=excluded.ins_tags, del_tags=excluded.del_tags;
并行任務(wù),,閱后即焚
例如開啟16個并行
abs(mod(hashtext(imei), 16))=?
-- CTE語法,,支持閱后即焚的批量合并方法
with tmp as (delete from tag_log where ctid = any ( array (
select ctid from tag_log where abs(mod(hashtext(imei), 16))=0 order by crt_time limit 10000 -- 按時序,批量取1萬條,,按HASH并行
)) returning * )
, tmp1 as (select imei,
uniq(sort(array_agg(case when action='insert' then tagids else -99999999 end))) - (-99999999) AS ins_tags,
uniq(sort(array_agg(case when action='delete' then tagids else -99999999 end))) - (-99999999) AS del_tags
from (select imei, action, unnest(tagids) as tagids from tmp) t group by imei)
insert into tbl3 (imei, tagids, ins_tags, del_tags)
select imei, ins_tags-del_tags, ins_tags, del_tags from tmp1
on conflict (imei) do update set tagids=((tbl3.tagids | excluded.ins_tags) - excluded.del_tags), ins_tags=excluded.ins_tags, del_tags=excluded.del_tags;
寫成函數(shù),,方便調(diào)用
create or replace function consume_tag_log(mo int, mov int, lim int) returns void as $$
declare
begin
execute format($_$with tmp as (delete from tag_log where ctid = any ( array (
select ctid from tag_log where abs(mod(hashtext(imei), %s))=%s order by crt_time limit %s
)) returning * )
, tmp1 as (select imei,
uniq(sort(array_agg(case when action='insert' then tagids else -99999999 end))) - (-99999999) AS ins_tags,
uniq(sort(array_agg(case when action='delete' then tagids else -99999999 end))) - (-99999999) AS del_tags
from (select imei, action, unnest(tagids) as tagids from tmp) t group by imei)
insert into tbl3 (imei, tagids, ins_tags, del_tags)
select imei, ins_tags-del_tags, ins_tags, del_tags from tmp1
on conflict (imei) do update set tagids=((tbl3.tagids | excluded.ins_tags) - excluded.del_tags), ins_tags=excluded.ins_tags, del_tags=excluded.del_tags$_$,
mo, mov, lim);
end;
$$ language plpgsql strict;
select consume_tag_log(16,0,10000); -- 并行處理
select consume_tag_log(16,1,10000);
.....
select consume_tag_log(16,15,10000);
create or replace function consume_tag_log(lim int) returns void as $$
declare
begin
execute format($_$with tmp as (delete from tag_log where ctid = any ( array (
select ctid from tag_log order by crt_time limit %s
)) returning * )
, tmp1 as (select imei,
uniq(sort(array_agg(case when action='insert' then tagids else -99999999 end))) - (-99999999) AS ins_tags,
uniq(sort(array_agg(case when action='delete' then tagids else -99999999 end))) - (-99999999) AS del_tags
from (select imei, action, unnest(tagids) as tagids from tmp) t group by imei)
insert into tbl3 (imei, tagids, ins_tags, del_tags)
select imei, ins_tags-del_tags, ins_tags, del_tags from tmp1
on conflict (imei) do update set tagids=((tbl3.tagids | excluded.ins_tags) - excluded.del_tags), ins_tags=excluded.ins_tags, del_tags=excluded.del_tags$_$,
lim);
end;
$$ language plpgsql strict;
select consume_tag_log(10000); -- 每次處理1萬條
創(chuàng)建調(diào)度任務(wù),執(zhí)行消費函數(shù)調(diào)度即可,。
閱后即焚的處理速度,,每秒 百萬行。
《(OLTP) 高吞吐數(shù)據(jù)進出(堆存,、行掃,、無需索引) - 閱后即焚(讀寫大吞吐并測)》
性能驗證
1,、標簽取值范圍5萬,正態(tài)分布
2,、多表批量寫入函數(shù)
create or replace function ins(
imei text,
tagids int[]
) returns void as $$
declare
suffix int := abs(mod(hashtext(imei),16));
begin
execute format($_$insert into tag_log%s values ('%s', 'insert', '%s'::int[])$_$, suffix, imei, tagids);
end;
$$ language plpgsql strict;
3,、多表批量消費
標簽表分表
do language plpgsql $$
declare
begin
for i in 0..15 loop
execute format('create table tbl3_%s (like tbl3 including all) inherits(tbl3)', i);
end loop;
end;
$$;
多表批量消費
CREATE OR REPLACE FUNCTION public.consume_tag_log(suffix int, lim integer)
RETURNS void
LANGUAGE plpgsql
STRICT
AS $function$
declare
begin
execute format($_$with tmp as (delete from tag_log%s where ctid = any ( array (
select ctid from tag_log%s order by crt_time limit %s -- 按時序,批量取1萬條,,按HASH并行
)) returning * )
, tmp1 as (select imei,
uniq(sort(array_agg(case when action='insert' then tagids else -99999999 end))) - (-99999999) AS ins_tags,
uniq(sort(array_agg(case when action='delete' then tagids else -99999999 end))) - (-99999999) AS del_tags
from (select imei, action, unnest(tagids) as tagids from tmp) t group by imei)
insert into tbl3_%s (imei, tagids, ins_tags, del_tags)
select imei, ins_tags-del_tags, ins_tags, del_tags from tmp1
on conflict (imei) do update set tagids=((tbl3_%s.tagids | excluded.ins_tags) - excluded.del_tags), ins_tags=excluded.ins_tags, del_tags=excluded.del_tags$_$,
suffix, suffix, lim, suffix, suffix);
end;
$function$;
4,、數(shù)據(jù)寫入壓測腳本
vi test.sql
\set tag1 random_gaussian(1, 50000, 20)
\set tag2 random_gaussian(1, 50000, 20)
\set tag3 random_gaussian(1, 50000, 20)
\set tag4 random_gaussian(1, 50000, 20)
\set tag5 random_gaussian(1, 50000, 20)
\set tag6 random_gaussian(1, 50000, 20)
\set tag7 random_gaussian(1, 50000, 20)
\set tag8 random_gaussian(1, 50000, 20)
\set imei random(1,1000000000)
select ins(:imei, (array[:tag1,:tag2,:tag3,:tag4,:tag5,:tag6,:tag7,:tag8])::int[]);
nohup pgbench -M prepared -n -r -P 1 -f ./test.sql -c 28 -j 28 -T 3000 >./tag.log 2>&1 &
5、數(shù)據(jù)消費,,并行調(diào)度
用秒殺技術(shù)實現(xiàn)并行調(diào)度,,避免單個HASH被重復(fù)調(diào)用。
《HTAP數(shù)據(jù)庫 PostgreSQL 場景與性能測試之 30 - (OLTP) 秒殺 - 高并發(fā)單點更新》
這里直接用分區(qū)表寫入的話,,性能會更爽,,原理請看如下:
《阿里云RDS PostgreSQL OSS 外部表 - (dblink異步調(diào)用封裝)并行寫提速案例》
vi test1.sql
\set mov random(0,15)
select consume_tag_log(:mov,10000) where pg_try_advisory_xact_lock(:mov);
nohup pgbench -M prepared -n -r -P 1 -f ./test1.sql -c 16 -j 16 -T 3000 >./consume.log 2>&1 &
6、壓測結(jié)果
寫入速度
單條單步寫入,,約 14.3萬 行/s
改成多表批量寫入,,可以提高到100萬+ 行/s
消費速度
單表并行批量消費,約 25.5萬 行/s
改成多表并行批量消費,,可以提高到 100萬+ 行/s
查詢速度,,毫秒級
postgres=# explain (analyze,verbose,timing,costs,buffers) select count(imei) from tbl3 where tagids @> (array[25281,25288])::int[];
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------
Aggregate (cost=224.50..224.51 rows=1 width=8) (actual time=2.745..2.746 rows=1 loops=1)
Output: count(imei)
Buffers: shared hit=193
-> Bitmap Heap Scan on public.tbl3 (cost=218.44..224.49 rows=5 width=33) (actual time=2.716..2.738 rows=9 loops=1)
Output: imei, tagids, ins_tags, del_tags
Recheck Cond: (tbl3.tagids @> '{25281,25288}'::integer[])
Heap Blocks: exact=9
Buffers: shared hit=193
-> Bitmap Index Scan on idx_tbl3_tagids (cost=0.00..218.44 rows=5 width=0) (actual time=2.707..2.707 rows=9 loops=1)
Index Cond: (tbl3.tagids @> '{25281,25288}'::integer[])
Buffers: shared hit=184
Planning time: 0.165 ms
Execution time: 2.797 ms
(13 rows)
除了以上基于數(shù)組、GIN索引的設(shè)計,,PostgreSQL還有一些技術(shù),,可以用在經(jīng)營分析系統(tǒng)。
技術(shù)1 實時透視 - 技術(shù)之 - 流式統(tǒng)計
通過insert on conflict,,流式的統(tǒng)計固定模型的維度數(shù)據(jù),。
《PostgreSQL 流式統(tǒng)計 - insert on conflict 實現(xiàn) 流式 UV(distinct), min, max, avg, sum, count …》
滿足這類查詢的實時流式統(tǒng)計:
select a,count(*),sum(b),avg(b),min(b),max(b) from tbl group by a;
技術(shù)2 實時透視、估算 - 技術(shù)之 - 流式統(tǒng)計 + HLL
通過insert on conflict,,流式的統(tǒng)計固定模型的維度數(shù)據(jù),。這里要用到hll插件,存儲count(dinstinct x)的估值
《PostgreSQL 流式統(tǒng)計 - insert on conflict 實現(xiàn) 流式 UV(distinct), min, max, avg, sum, count …》
《PostgreSQL hll (HyperLogLog) extension for “State of The Art Cardinality Estimation Algorithm” - 3》
《PostgreSQL hll (HyperLogLog) extension for “State of The Art Cardinality Estimation Algorithm” - 2》
《PostgreSQL hll (HyperLogLog) extension for “State of The Art Cardinality Estimation Algorithm” - 1》
滿足這類查詢的實時流式統(tǒng)計:
select a, count(distinct b) from tbl group by a;
技術(shù)3 實時透視,、估算 - 技術(shù)之 - 計劃估算
根據(jù)執(zhí)行計劃得到評估行,。
《妙用explain Plan Rows快速估算行》
如果輸入多個字段條件,,為了提高行估算準確度,,可以定義多字段統(tǒng)計信息,10新增的功能:
《PostgreSQL 10 黑科技 - 自定義統(tǒng)計信息》
滿足這類查詢的估算需求:
select count(*) from tbl where xxxx;
SQL換算成
select * from tbl where xxxx; -- 通過explain的行估算拿結(jié)果
技術(shù)4 實時透視,、估算 - 技術(shù)之 - 采樣估算
《秒級任意維度分析1TB級大表 - 通過采樣估值滿足高效TOP N等統(tǒng)計分析需求》
采樣估算,,適合求TOP N。
滿足這類查詢的估算需求:
select a from tbl group by a order by count(*) desc limit N;
技術(shù)5 實時圈選,、透視 - 技術(shù)之 - GIN倒排
倒排索引針對多值類型,,例如 hstore, array, tsvector, json, jsonb,。
主樹的K-V分別為:
element-ctid(行號)list or tree
輔樹為
ctid list or tree
從而高效的滿足這類查詢的需求:
-- 包含哪些元素
select * from tbl where arr @> array[xx,xx];
-- 包含哪些任意元素之一
select * from tbl where arr && array[xx,xx];
內(nèi)部使用BITMAP掃描方法,過濾到少量數(shù)據(jù)塊,。
《PostgreSQL 9種索引的原理和應(yīng)用場景》
技術(shù)6 實時圈選,、透視 - 技術(shù)之 - bitmap
這個方法非常的巧妙,將tag和imei做了倒轉(zhuǎn),,以tag為key, imei為bitmap來存儲,。
create table tag_users (
tagid int primary key, -- 標簽
imeibitmap varbit, -- 每個imei用一個BIT位表示
);
查詢換算:
-- 包含某些標簽的用戶
select bitand(imeibitmap) from tag_users where tagid in (?,?,...);
-- 包含任意標簽的用戶
select bitor(imeibitmap) from tag_users where tagid in (?,?,...);
案例參考:
《阿里云RDS for PostgreSQL varbitx插件與實時畫像應(yīng)用場景介紹》
《基于 阿里云 RDS PostgreSQL 打造實時用戶畫像推薦系統(tǒng)(varbitx)》
技術(shù)7 實時圈選、透視 - 技術(shù)之 - 并行計算
PostgreSQL 10加入了并行計算的能力,,在join , filter, seqscan, order by, agg, group等方面都支持并行,。
性能指標參考:
《HTAP數(shù)據(jù)庫 PostgreSQL 場景與性能測試之 23 - (OLAP) 并行計算》
技術(shù)8 實時圈選、透視 - 技術(shù)之 - MPP, 列存儲, 位圖索引
基于PostgreSQL的MPP 數(shù)據(jù)倉庫Greenplum,,支持列存儲,,位圖索引。
用資源,,暴力解決問題,。
沒有太多的設(shè)計技巧,堆機器就可以,,但是本身的效率遠比impalar, hive好很多,。
Greenplum是非常值得推薦的OLAP數(shù)據(jù)庫。在金融,、政府,、航空等大數(shù)據(jù)領(lǐng)域有眾多案例。
決策支持技術(shù)
流式數(shù)據(jù)處理+UDF函數(shù)計算技術(shù),??梢詽M足實時決策的需求。
案例如下:
《HTAP數(shù)據(jù)庫 PostgreSQL 場景與性能測試之 32 - (OLTP) 高吞吐數(shù)據(jù)進出(堆存,、行掃,、無需索引) - 閱后即焚(JSON + 函數(shù)流式計算)》
《HTAP數(shù)據(jù)庫 PostgreSQL 場景與性能測試之 27 - (OLTP) 物聯(lián)網(wǎng) - FEED日志, 流式處理 與 閱后即焚 (CTE)》
相似案例舉例
1、實時統(tǒng)計 count(distinct)估值,,min, max, avg, sum, count精確值,。
《PostgreSQL 流式統(tǒng)計 - insert on conflict 實現(xiàn) 流式 UV(distinct), min, max, avg, sum, count …》
2、
《PostgreSQL 異步消息實踐 - Feed系統(tǒng)實時監(jiān)測與響應(yīng)(如 電商主動服務(wù)) - 分鐘級到毫秒級的實現(xiàn)》
《(OLTP) 物聯(lián)網(wǎng) - FEED日志, 流式處理 與 閱后即焚 (CTE)》
3,、讓explain產(chǎn)生精確的多字段輸入條件行數(shù)估值(select * from table where a=? and|or b=? .... )
《PostgreSQL 10 黑科技 - 自定義統(tǒng)計信息》
4,、《恭迎萬億級營銷(圈人)瀟灑的邁入毫秒時代 - 萬億user_tags級實時推薦系統(tǒng)數(shù)據(jù)庫設(shè)計》
《阿里云RDS for PostgreSQL varbitx插件與實時畫像應(yīng)用場景介紹》
《基于 阿里云 RDS PostgreSQL 打造實時用戶畫像推薦系統(tǒng)(varbitx)》
5、決策支持,,流式函數(shù)計算
《(OLTP) 高吞吐數(shù)據(jù)進出(堆存,、行掃、無需索引) - 閱后即焚(JSON + 函數(shù)流式計算)》
《(OLTP) 高吞吐數(shù)據(jù)進出(堆存,、行掃,、無需索引) - 閱后即焚(讀寫大吞吐并測)》
6,、圈人案例
《(OLAP) 用戶畫像圈人場景 - 數(shù)組相交查詢與聚合》
《(OLAP) 用戶畫像圈人場景 - 數(shù)組包含查詢與聚合》
7、時間,、空間,、多維圈人、透視案例
《空間|時間|對象 圈人 + 透視 - 暨PostgreSQL 10與Greenplum的對比和選擇》
《PostgreSQL\GPDB 毫秒級海量 時空數(shù)據(jù)透視 典型案例分享》
《PostgreSQL\GPDB 毫秒級海量 多維數(shù)據(jù)透視 案例分享》
8,、視頻網(wǎng)站透視案例
《音視圖(泛內(nèi)容)網(wǎng)站透視分析 DB設(shè)計 - 阿里云(RDS,、HybridDB) for PostgreSQL最佳實踐》
9、
《畫像圈人 + 人群行為透視》
《奔跑吧,,大屏 - 時間+空間 實時四維數(shù)據(jù)透視》
《數(shù)據(jù)透視 - 商場(如沃爾瑪)選址應(yīng)用》
《海量用戶實時定位和圈人 - 團圓社會公益系統(tǒng)(位置尋人\圈人)》
《萬億級電商廣告 - brin黑科技帶你(最低成本)玩轉(zhuǎn)毫秒級圈人(視覺挖掘姊妹篇) - 阿里云RDS PostgreSQL, HybridDB for PostgreSQL最佳實踐》
《多字段,,任意組合條件查詢(無需建模) - 毫秒級實時圈人 最佳實踐》
10、
《經(jīng)營,、銷售分析系統(tǒng)DB設(shè)計之PostgreSQL, Greenplum - 共享充電寶 案例實踐》
|