一,、大數(shù)據(jù)處理的常用方法 之前在《采集→清洗→處理:基于MapReduce的離線數(shù)據(jù)分析》中已經(jīng)有提及到,這里依然給出下面的圖示: 前面給出的那篇文章是基于MapReduce的離線數(shù)據(jù)分析案例,,其通過對網(wǎng)站產(chǎn)生的用戶訪問日志進(jìn)行處理并分析出該網(wǎng)站在某天的PV,、UV等數(shù)據(jù)。 對應(yīng)上面的圖示,,其走的就是離線處理的數(shù)據(jù)處理方式,,而這里即將要介紹的是另外一條路線的數(shù)據(jù)處理方式,即基于Storm的在線處理,。在下面給出的完整案例中,,我們將會完成下面的幾項(xiàng)工作:
如果你對上面提及的大數(shù)據(jù)組件已經(jīng)有所認(rèn)識,或者對如何構(gòu)建大數(shù)據(jù)實(shí)時(shí)處理系統(tǒng)感興趣,,那么就可以盡情閱讀下面的內(nèi)容了,。 需要注意的是,核心在于如何構(gòu)建實(shí)時(shí)處理系統(tǒng),,而這里給出的案例是實(shí)時(shí)統(tǒng)計(jì)某個網(wǎng)站的PV,、UV,在實(shí)際中,,基于每個人的工作環(huán)境不同,,業(yè)務(wù)不同,因此業(yè)務(wù)系統(tǒng)的復(fù)雜度也不盡相同,,相對來說,,這里統(tǒng)計(jì)PV、UV的業(yè)務(wù)是比較簡單的,,但也足夠讓我們對大數(shù)據(jù)實(shí)時(shí)處理系統(tǒng)有一個基本的,、清晰的了解與認(rèn)識,是的,,它不再那么神秘了,。 二,、實(shí)時(shí)處理系統(tǒng)架構(gòu) 我們的實(shí)時(shí)處理系統(tǒng)整體架構(gòu)如下: 即從上面的架構(gòu)中我們可以看出,其由下面的幾部分構(gòu)成:
從構(gòu)建實(shí)時(shí)處理系統(tǒng)的角度出發(fā),,我們需要做的是讓數(shù)據(jù)在各個不同的集群系統(tǒng)之間打通(從上面的圖示中也能很好地說明這一點(diǎn)),,即需要做各個系統(tǒng)之前的整合,包括Flume與Kafka的整合,,Kafka與Storm的整合,。當(dāng)然,各個環(huán)境是否使用集群,,依個人的實(shí)際需要而定,,在我們的環(huán)境中,F(xiàn)lume,、Kafka,、Storm都使用集群。 三,、Flume+Kafka整合 1整合思路 對于Flume而言,,關(guān)鍵在于如何采集數(shù)據(jù),并且將其發(fā)送到Kafka上,,并且由于我們這里了使用Flume集群的方式,,F(xiàn)lume集群的配置也是十分關(guān)鍵的。而對于Kafka,,關(guān)鍵就是如何接收來自Flume的數(shù)據(jù),。從整體上講,邏輯應(yīng)該是比較簡單的,,在Kafka中創(chuàng)建一個用于我們實(shí)時(shí)處理系統(tǒng)的topic,,然后Flume將其采集到的數(shù)據(jù)發(fā)送到該topic上即可。 2整合過程 整合過程:Flume集群配置與Kafka Topic創(chuàng)建,。 Flume集群配置 在我們的場景中,,兩個Flume Agent分別部署在兩臺Web服務(wù)器上,用來采集Web服務(wù)器上的日志數(shù)據(jù),,然后其數(shù)據(jù)的下沉方式都為發(fā)送到另外一個Flume Agent上,,所以這里我們需要配置三個Flume Agent。
該Flume Agent部署在一臺Web服務(wù)器上,,用來采集產(chǎn)生的Web日志,,然后發(fā)送到Flume Consolidation Agent上,創(chuàng)建一個新的配置文件flume-sink-avro.conf,,其配置內(nèi)容如下: ######################################################### ## ##主要作用是監(jiān)聽文件中的新增數(shù)據(jù),,采集到數(shù)據(jù)之后,輸出到avro ## 注意:Flume agent的運(yùn)行,,主要就是配置source channel sink ## 下面的a1就是agent的代號,,source叫r1 channel叫c1 sink叫k1 ######################################################### a1.sources = r1 a1.sinks = k1 a1.channels = c1 #對于source的配置描述 監(jiān)聽文件中的新增數(shù)據(jù) exec a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/uplooking/data/data-clean/data-access.log #對于sink的配置描述 使用avro日志做數(shù)據(jù)的消費(fèi) a1.sinks.k1.type = avro a1.sinks.k1.hostname = uplooking03 a1.sinks.k1.port = 44444 #對于channel的配置描述 使用文件做數(shù)據(jù)的臨時(shí)緩存 這種的安全性要高 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint a1.channels.c1.dataDirs = /home/uplooking/data/flume/data #通過channel c1將source r1和sink k1關(guān)聯(lián)起來 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 配置完成后, 啟動Flume Agent,,即可對日志文件進(jìn)行監(jiān)聽: $ flume-ng agent --conf conf -n a1 -f app/flume/conf/flume-sink-avro.conf >/dev/ 2>&1 &
該Flume Agent用于接收其它兩個Agent發(fā)送過來的數(shù)據(jù),,然后將其發(fā)送到Kafka上,創(chuàng)建一個新的配置文件flume-source_avro-sink_kafka.conf,,配置內(nèi)容如下: ##主要作用是監(jiān)聽目錄中的新增文件,,采集到數(shù)據(jù)之后,輸出到kafka #對于source的配置描述 監(jiān)聽avro a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444 #對于sink的配置描述 使用kafka做數(shù)據(jù)的消費(fèi) a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = f-k-s a1.sinks.k1.brokerList = uplooking01:9092,uplooking02:9092,uplooking03:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 20 #對于channel的配置描述 使用內(nèi)存緩沖區(qū)域做數(shù)據(jù)的臨時(shí)緩存 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.channel = c1 配置完成后,, 啟動Flume Agent,,即可對avro的數(shù)據(jù)進(jìn)行監(jiān)聽: $ flume-ng agent --conf conf -n a1 -f app/flume/conf/flume-source_avro-sink_kafka.conf >/dev/ 2>&1 & Kafka配置 在我們的Kafka中,先創(chuàng)建一個topic,,用于后面接收Flume采集過來的數(shù)據(jù): kafka-topics.sh --create --topic f-k-s --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 --partitions 3 --replication-factor 3 3整合驗(yàn)證 啟動Kafka的消費(fèi)腳本: $ kafka-console-consumer.sh --topic f-k-s --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 如果在Web服務(wù)器上有新增的日志數(shù)據(jù),,就會被我們的Flume程序監(jiān)聽到,并且最終會傳輸?shù)降終afka的f-k-stopic中,,這里作為驗(yàn)證,,我們上面啟動的是一個Kafka終端消費(fèi)的腳本,這時(shí)會在終端中看到數(shù)據(jù)的輸出: 這樣的話,,我們的整合就沒有問題,,當(dāng)然Kafka中的數(shù)據(jù)應(yīng)該是由我們的Storm來進(jìn)行消費(fèi)的,這里只是作為整合的一個測試,,下面就會來做Kafka+Storm的整合,。 四、Kafka+Storm整合 Kafka和Storm的整合其實(shí)在Storm的官網(wǎng)上也有非常詳細(xì)清晰的文檔: http://storm./releases/1.0.6/storm-kafka.html 想對其有更多了解的同學(xué)可以參考一下,。 1整合思路 在這次的大數(shù)據(jù)實(shí)時(shí)處理系統(tǒng)的構(gòu)建中,,Kafka相當(dāng)于是作為消息隊(duì)列(或者說是消息中間件)的角色,其產(chǎn)生的消息需要有消費(fèi)者去消費(fèi),,所以Kafka與Storm的整合,,關(guān)鍵在于我們的Storm如何去消費(fèi)Kafka消息topic中的消息(Kafka消息topic中的消息正是由Flume采集而來,現(xiàn)在我們需要在Storm中對其進(jìn)行消費(fèi)),。 在Storm中,,topology是非常關(guān)鍵的概念。 對比MapReduce,,在MapReduce中,,我們提交的作業(yè)稱為一個Job,在一個Job中,,又包含若干個Mapper和Reducer,,正是在Mapper和Reducer中有我們對數(shù)據(jù)的處理邏輯: 在Storm中,我們提交的一個作業(yè)稱為topology,,其又包含了spout和bolt,,在Storm中,,對數(shù)據(jù)的處理邏輯正是在spout和bolt中體現(xiàn): 即在spout中,正是我們數(shù)據(jù)的來源,,又因?yàn)槠鋵?shí)時(shí)的特性,,所以可以把它比作一個“水龍頭”,表示其源源不斷地產(chǎn)生數(shù)據(jù): 所以,,問題的關(guān)鍵是spout如何去獲取來自Kafka的數(shù)據(jù),? 好在,Storm-Kafka的整合庫中提供了這樣的API來供我們進(jìn)行操作,。 2整合過程 整合過程應(yīng)用了KafkaSpout,。在代碼的邏輯中只需要創(chuàng)建一個由Storm-KafkaAPI提供的KafkaSpout對象即可: SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id);return new KafkaSpout(spoutConf); 下面給出完整的整合代碼: package cn.xpleaf.bigdata.storm.statics; import kafka.api.OffsetRequest; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.kafka.BrokerHosts; import org.apache.storm.kafka.KafkaSpout; import org.apache.storm.kafka.SpoutConfig; import org.apache.storm.kafka.ZkHosts; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple; * Kafka和storm的整合,用于統(tǒng)計(jì)實(shí)時(shí)流量對應(yīng)的pv和uv */public class KafkaStormTopology { // static class MyKafkaBolt extends BaseRichBolt { static class MyKafkaBolt extends BaseBasicBolt { * kafkaSpout發(fā)送的字段名為bytes */ @Override public void execute(Tuple input, BasicOutputCollector collector) { byte binary = input.getBinary(0); // 跨jvm傳輸數(shù)據(jù),,接收到的是字節(jié)數(shù)據(jù) // byte bytes = input.getBinaryByField('bytes'); // 這種方式也行 String line = new String(binary); System.out.println(line); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder; * 設(shè)置spout和bolt的dag(有向無環(huán)圖) */ KafkaSpout kafkaSpout = createKafkaSpout; builder.setSpout('id_kafka_spout', kafkaSpout); builder.setBolt('id_kafka_bolt', new MyKafkaBolt) .shuffleGrouping('id_kafka_spout'); // 通過不同的數(shù)據(jù)流轉(zhuǎn)方式,,來指定數(shù)據(jù)的上游組件 // 使用builder構(gòu)建topology StormTopology topology = builder.createTopology; String topologyName = KafkaStormTopology.class.getSimpleName; // 拓?fù)涞拿Q Config config = new Config; // Config對象繼承自HashMap,但本身封裝了一些基本的配置 // 啟動topology,,本地啟動使用LocalCluster,,集群啟動使用StormSubmitter if (args == || args.length < 1)="" {=""> LocalCluster localCluster = new LocalCluster; // 本地開發(fā)模式,創(chuàng)建的對象為LocalCluster localCluster.submitTopology(topologyName, config, topology); } else { StormSubmitter.submitTopology(topologyName, config, topology); * BrokerHosts hosts kafka集群列表 * String topic 要消費(fèi)的topic主題 * String zkRoot kafka在zk中的目錄(會在該節(jié)點(diǎn)目錄下記錄讀取kafka消息的偏移量) * String id 當(dāng)前操作的標(biāo)識id */ private static KafkaSpout createKafkaSpout { String brokerZkStr = 'uplooking01:2181,uplooking02:2181,uplooking03:2181'; BrokerHosts hosts = new ZkHosts(brokerZkStr); // 通過zookeeper中的/brokers即可找到kafka的地址 String topic = 'f-k-s'; String zkRoot = '/' + topic; String id = 'consumer-id'; SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id); // 本地環(huán)境設(shè)置之后,,也可以在zk中建立/f-k-s節(jié)點(diǎn),,在集群環(huán)境中,不用配置也可以在zk中建立/f-k-s節(jié)點(diǎn) //spoutConf.zkServers = Arrays.asList(new String[]{'uplooking01', 'uplooking02', 'uplooking03'}); //spoutConf.zkPort = 2181; spoutConf.startOffsetTime = OffsetRequest.LatestTime; // 設(shè)置之后,,剛啟動時(shí)就不會把之前的消費(fèi)也進(jìn)行讀取,,會從最新的偏移量開始讀取 return new KafkaSpout(spoutConf); (上下滑動查看完整代碼) 其實(shí)代碼的邏輯非常簡單,我們只創(chuàng)建了 一個由Storm-Kafka提供的KafkaSpout對象和一個包含我們處理邏輯的MyKafkaBolt對象,,MyKafkaBolt的邏輯也很簡單,,就是把Kafka的消息打印到控制臺上。 需要注意的是,,后面我們分析網(wǎng)站PV,、UV的工作,正是在上面這部分簡單的代碼中完成的,,所以其是非常重要的基礎(chǔ),。 3整合驗(yàn)證 上面的整合代碼,可以在本地環(huán)境中運(yùn)行,,也可以將其打包成jar包上傳到我們的Storm集群中并提交業(yè)務(wù)來運(yùn)行,。如果Web服務(wù)器能夠產(chǎn)生日志,并且前面Flume+Kafka的整合也沒有問題的話,,將會有下面的效果,。 如果是在本地環(huán)境中運(yùn)行上面的代碼,那么可以在控制臺中看到日志數(shù)據(jù)的輸出: 如果是在Storm集群中提交的作業(yè)運(yùn)行,,那么也可以在Storm的日志中看到Web服務(wù)器產(chǎn)生的日志數(shù)據(jù): 這樣的話就完成了Kafka+Storm的整合,。 五,、Storm+Redis整合 1整合思路 其實(shí)所謂Storm和Redis的整合,指的是在我們的實(shí)時(shí)處理系統(tǒng)中的數(shù)據(jù)的落地方式,,即在Storm中包含了我們處理數(shù)據(jù)的邏輯,,而數(shù)據(jù)處理完畢后,產(chǎn)生的數(shù)據(jù)處理結(jié)果該保存到什么地方呢,?顯然就有很多種方式了,關(guān)系型數(shù)據(jù)庫,、NoSQL,、HDFS、HBase等,,這應(yīng)該取決于具體的業(yè)務(wù)和數(shù)據(jù)量,,在這里,我們使用Redis來進(jìn)行最后分析數(shù)據(jù)的存儲,。 所以實(shí)際上做這一步的整合,,其實(shí)就是開始寫我們的業(yè)務(wù)處理代碼了,因?yàn)橥ㄟ^前面Flume-Kafka-Storm的整合,,已經(jīng)打通了整個數(shù)據(jù)的流通路徑,,接下來關(guān)鍵要做的是,在Storm中,,如何處理我們的數(shù)據(jù)并保存到Redis中,。 而在Storm中,spout已經(jīng)不需要我們來寫了(由Storm-Kafka的API提供了KafkaSpout對象),,所以問題就變成,,如何根據(jù)業(yè)務(wù)編寫分析處理數(shù)據(jù)的bolt。 2整合過程 整合過程:編寫Storm業(yè)務(wù)處理Bolt,。 日志分析 我們實(shí)時(shí)獲取的日志格式如下: 其中需要說明的是第二個字段和第三個字段,,因?yàn)樗鼘ξ覀兘y(tǒng)計(jì)PV和UV非常有幫助,它們分別是ip字段和mid字段,,說明如下:
因此,,根據(jù)IP地址,,我們可以通過查詢得到其所在的省份,并且創(chuàng)建一個屬于該省份的變量,,用于記錄pv數(shù),,每來一條屬于該省份的日志記錄,,則該省份的PV就加1,以此來完成pv的統(tǒng)計(jì),。 而對于mid,,我們則可以創(chuàng)建屬于該省的一個set集合,每來一條屬于該省份的日志記錄,,則可以將該mid添加到set集合中,,因?yàn)閟et集合存放的是不重復(fù)的數(shù)據(jù),這樣就可以幫我們自動過濾掉重復(fù)的mid,,根據(jù)set集合的大小,,就可以統(tǒng)計(jì)出UV。 在我們storm的業(yè)務(wù)處理代碼中,,我們需要編寫兩個bolt:
當(dāng)然上面只是說明了整體的思路,實(shí)際上還有很多需要注意的細(xì)節(jié)問題和技巧問題,,這都在我們的代碼中進(jìn)行體現(xiàn),,我在后面寫的代碼中都加了非常詳細(xì)的注釋進(jìn)行說明。 編寫第一個Bolt:ConvertIPBolt 根據(jù)上面的分析,,編寫用于數(shù)據(jù)預(yù)處理的bolt,,代碼如下: package cn.xpleaf.bigdata.storm.statistic; import cn.xpleaf.bigdata.storm.utils.JedisUtil; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import redis.clients.jedis.Jedis; * 日志數(shù)據(jù)預(yù)處理Bolt,實(shí)現(xiàn)功能: * 1.提取實(shí)現(xiàn)業(yè)務(wù)需求所需要的信息:ip地址,、客戶端唯一標(biāo)識mid * 2.查詢IP地址所屬地,,并發(fā)送到下一個Bolt */public class ConvertIPBolt extends BaseBasicBolt { byte binary = input.getBinary(0); String line = new String(binary); String fields = line.split('\t'); if(fields == || fields.length < 10)=""> return; } // 獲取ip和mid String ip = fields[1]; String mid = fields[2]; // 根據(jù)ip獲取其所屬地(省份) String province = ; if (ip != ) { Jedis jedis = JedisUtil.getJedis; province = jedis.hget('ip_info_en', ip); // 需要釋放jedis的資源,否則會報(bào)can not get resource from the pool JedisUtil.returnJedis(jedis); } // 發(fā)送數(shù)據(jù)到下一個bolt,,只發(fā)送實(shí)現(xiàn)業(yè)務(wù)功能需要的province和mid collector.emit(new Values(province, mid)); * 定義了發(fā)送到下一個bolt的數(shù)據(jù)包含兩個域:province和mid declarer.declare(new Fields('province', 'mid')); 編寫第二個Bolt:StatisticBolt 這個bolt包含我們統(tǒng)計(jì)網(wǎng)站PV,、UV的代碼邏輯,因此非常重要,,其代碼如下: import org.apache.storm.Constants; import java.text.SimpleDateFormat; import java.util.*; * 日志數(shù)據(jù)統(tǒng)計(jì)Bolt,,實(shí)現(xiàn)功能: * 1.統(tǒng)計(jì)各省份的PV、UV * 2.以天為單位,,將省份對應(yīng)的PV,、UV信息寫入Redis */public class StatisticBolt extends BaseBasicBolt { Map Map SimpleDateFormat sdf = new SimpleDateFormat('yyyyMMdd'); if (!input.getSourceComponent.equalsIgnoreCase(Constants.SYSTEM_COMPONENT_ID)) { // 如果收到非系統(tǒng)級別的tuple,統(tǒng)計(jì)信息到局部變量mids String province = input.getStringByField('province'); String mid = input.getStringByField('mid'); pvMap.put(province, pvMap.get(province) + 1); // pv+1 if(mid != ) { midsMap.get(province).add(mid); // 將mid添加到該省份所對應(yīng)的set中 } } else { // 如果收到系統(tǒng)級別的tuple,則將數(shù)據(jù)更新到Redis中,,釋放JVM堆內(nèi)存空間 /* * 以 廣東 為例,,其在Redis中保存的數(shù)據(jù)格式如下: * guangdong_pv(Redis數(shù)據(jù)結(jié)構(gòu)為hash) * --20180415 * --pv數(shù) * --20180416 * --pv數(shù) * guangdong_mids_20180415(Redis數(shù)據(jù)結(jié)構(gòu)為set) * --mid * ...... String dateStr = sdf.format(new Date); // 更新pvMap數(shù)據(jù)到Redis中 String pvKey = ; for(String province : pvMap.keySet) { int currentPv = pvMap.get(province); if(currentPv > 0) { // 當(dāng)前map中的pv大于0才更新,否則沒有意義 pvKey = province + '_pv'; String oldPvStr = jedis.hget(pvKey, dateStr); if(oldPvStr == ) { oldPvStr = '0'; } Long oldPv = Long.valueOf(oldPvStr); jedis.hset(pvKey, dateStr, oldPv + currentPv + ''); pvMap.replace(province, 0); // 將該省的pv重新設(shè)置為0 // 更新midsMap到Redis中 String midsKey = ; HashSet for(String province: midsMap.keySet) { midsSet = midsMap.get(province); if(midsSet.size > 0) { // 當(dāng)前省份的set的大小大于0才更新到,,否則沒有意義 midsKey = province + '_mids_' + dateStr; jedis.sadd(midsKey, midsSet.toArray(new String[midsSet.size()])); midsSet.clear; // 釋放jedis資源 JedisUtil.returnJedis(jedis); System.out.println(System.currentTimeMillis + '------->寫入數(shù)據(jù)到Redis'); * 設(shè)置定時(shí)任務(wù),,只對當(dāng)前bolt有效,系統(tǒng)會定時(shí)向StatisticBolt發(fā)送一個系統(tǒng)級別的tuple public Map Map config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10); return config; * 初始化各個省份的pv和mids信息(用來臨時(shí)存儲統(tǒng)計(jì)pv和uv需要的數(shù)據(jù)) */ public StatisticBolt { pvMap = new HashMap<>; midsMap = new HashMap String provinceArray = {'shanxi', 'jilin', 'hunan', 'hainan', 'xinjiang', 'hubei', 'zhejiang', 'tianjin', 'shanghai', 'anhui', 'guizhou', 'fujian', 'jiangsu', 'heilongjiang', 'aomen', 'beijing', 'shaanxi', 'chongqing', 'jiangxi', 'guangxi', 'gansu', 'guangdong', 'yunnan', 'sicuan', 'qinghai', 'xianggang', 'taiwan', 'neimenggu', 'henan', 'shandong', 'shanghai', 'hebei', 'liaoning', 'xizang'}; for(String province : provinceArray) { pvMap.put(province, 0); midsMap.put(province, new HashSet); (上下滑動可查看完整代碼) 編寫Topology 我們需要編寫一個topology用來組織前面編寫的Bolt,,代碼如下: * 構(gòu)建topology */public class StatisticTopology { builder.setBolt('id_convertIp_bolt', new ConvertIPBolt).shuffleGrouping('id_kafka_spout'); // 通過不同的數(shù)據(jù)流轉(zhuǎn)方式,,來指定數(shù)據(jù)的上游組件 builder.setBolt('id_statistic_bolt', new StatisticBolt).shuffleGrouping('id_convertIp_bolt'); // 通過不同的數(shù)據(jù)流轉(zhuǎn)方式,來指定數(shù)據(jù)的上游組件 將上面的程序打包成jar包,,并上傳到我們的集群提交業(yè)務(wù)后,,如果前面的整合沒有問題,并且Web服務(wù)也有Web日志產(chǎn)生,,那么一段時(shí)間后,,我們就可以在Redis數(shù)據(jù)庫中看到數(shù)據(jù)的最終處理結(jié)果,,即各個省份的UV和PV信息: 需要說明的是mid信息是一個set集合,,只要求出該set集合的大小,也就可以求出UV值,。 至此,,準(zhǔn)確來說,我們的統(tǒng)計(jì)PV,、UV的大數(shù)據(jù)實(shí)時(shí)處理系統(tǒng)是構(gòu)建完成了,,處理的數(shù)據(jù)結(jié)果的用途根據(jù)不同的業(yè)務(wù)需求而不同,但是對于網(wǎng)站的PV,、UV數(shù)據(jù)來說,,是非常適合用作可視化處理的,即用網(wǎng)頁動態(tài)將數(shù)據(jù)展示出來,,我們下一步正是要構(gòu)建一個簡單的Web應(yīng)用將PV,、UV數(shù)據(jù)動態(tài)展示出來。 六,、數(shù)據(jù)可視化處理 數(shù)據(jù)可視化處理目前我們需要完成兩部分的工作: 開發(fā)一個Web項(xiàng)目,,能夠查詢Redis中的數(shù)據(jù),同時(shí)提供訪問的頁面 自行開發(fā)或找一個符合我們需求的前端UI,,將Web項(xiàng)目中查詢到的數(shù)據(jù)展示出來 對于Web項(xiàng)目的開發(fā),,因個人的技術(shù)棧能力而異,選擇的語言和技術(shù)也有所不同,,只要能夠達(dá)到我們最終數(shù)據(jù)可視化的目的,,其實(shí)都行的。這個項(xiàng)目中我們要展示的是PV和UV數(shù)據(jù),難度不大,,因此可以選擇Java Web,,如Servlet、SpringMVC等,,或者Python Web,,如Flask、Django等,,F(xiàn)lask我個人非常喜歡,,因?yàn)殚_發(fā)非常快,,但因?yàn)榍懊嬉恢庇玫氖荍ava,,因此這里我還是選擇使用SpringMVC來完成。 至于UI這一塊,,我前端能力一般,,普通的開發(fā)沒有問題,但是要做出像上面這種地圖類型的UI界面來展示數(shù)據(jù)的話,,確實(shí)有點(diǎn)無能為力,。好在現(xiàn)在第三方的UI框架比較多,對于圖表類展示的,,比如就有highcharts和echarts,,其中echarts是百度開源的,有豐富的中文文檔,,非常容易上手,,所以這里我選擇使用echarts來作為UI,并且其剛好就有能夠滿足我們需求的地圖類的UI組件,。 因?yàn)殡y度不大,,具體的開發(fā)流程的這里就不提及了,有興趣的同學(xué)可以直接參考后面我提供的源代碼,,這里我們就直接來看一下效果好了,。 因?yàn)閷?shí)際上在本次項(xiàng)目案例中,這一塊的代碼也是非常少的,,使用SpringMVC開發(fā)的話,,只要把JavaEE三層構(gòu)架搭起來了,把依賴引入好了,,后面的開發(fā)確實(shí)不難的,;而如果有同學(xué)會Flask或者Django的話,其項(xiàng)目本身的構(gòu)建和代碼上也都會更容易,。 啟動我們的Web項(xiàng)目后,,輸入地址就可以訪問到數(shù)據(jù)的展示界面了: 可以看到,,echarts的這個UI還是比較好看的,而且也真的能夠滿足我們的需求,。每個省份上的兩個不同顏色的點(diǎn)表示目前我們需要展示的數(shù)據(jù)有兩種,,分別為PV和UV,在左上角也有體現(xiàn),,而顏色的深淺就可以體現(xiàn)PV或者UV的數(shù)量大小關(guān)系了,。 在這個界面上,點(diǎn)擊左上角的UV,,表示不查看UV的數(shù)據(jù),,這樣我們就會只看到PV的情況: 當(dāng)然,也可以只查看UV的情況: 當(dāng)鼠標(biāo)停留在某個省份上時(shí),,就可以查看這個省份具體的PV或UV值,,比如下面我們把鼠標(biāo)停留在“廣東”上時(shí),就可以看到其此時(shí)的PV值為170,,查看其它省份的也是如此: 那么數(shù)據(jù)是可以查看了,,又怎么體現(xiàn)動態(tài)呢? 對于頁面數(shù)據(jù)的動態(tài)刷新有兩種方案,,一種是定時(shí)刷新頁面,,另外一種則是定時(shí)向后端異步請求數(shù)據(jù)。 目前我采用的是第一種,,頁面定時(shí)刷新,,有興趣的同學(xué)也可以嘗試使用第二種方法,,只需要在后端開發(fā)相關(guān)的返回JSON數(shù)據(jù)的API即可,。 七、總結(jié) 那么至此,,從整個大數(shù)據(jù)實(shí)時(shí)處理系統(tǒng)的構(gòu)建到最后的數(shù)據(jù)可視化處理工作,,我們都已經(jīng)完成了,可以看到整個過程下來涉及到的知識層面還是比較多的,,不過我個人覺得,,只要把核心的原理牢牢掌握了,對于大部分情況而言,,環(huán)境的搭建以及基于業(yè)務(wù)的開發(fā)都能夠很好地解決,。 寫此文,一來是對自己實(shí)踐中的一些總結(jié),,二來也是希望把一些比較不錯的項(xiàng)目案例分享給大家,,總之希望能夠?qū)Υ蠹矣兴鶐椭?/p> 項(xiàng)目案例涉及到的代碼我已經(jīng)上傳到GitHub上面,分為兩個,,一個是storm的項(xiàng)目代碼,,另外一個是數(shù)據(jù)可視化處理的代碼,如下: storm-statistic: https://github.com/xpleaf/storm-statistic dynamic-show: https://github.com/xpleaf/dynamic-show 作者:xpleaf |
|