- 解壓縮 : tar -zxvf apache-flume-1.6.0-bin.tar.gz -C /opt/ ;
- 重命名 : mv /opt/apache-flume-1.6.0-bin/ /opt/flume(可省略) ;
- 復(fù)制配置文件 : cp conf/flume-env.sh.template conf/flume-env.sh ;
- 修改conf/flume-env.sh : JAVA_HOME ;
- 復(fù)制flume到其他節(jié)點(diǎn) : scp -r …… ,。
常見架構(gòu)
實(shí)戰(zhàn)需求
- A,、B兩臺機(jī)器實(shí)時(shí)生產(chǎn)日志主要類型為
access.log ,、ugcheader.log 、ugctail.log , 要求:
- 把A,、B 機(jī)器中的access.log,、ugcheader.log、ugctail.log 匯總到C機(jī)器上然后統(tǒng)一收集到hdfs和Kafka中,。
- 在hdfs中要求的目錄為:用作離線統(tǒng)計(jì),。
/source/access/2016-01-01/ /source/ugcheader/2016-01-01/ /source/ugctail/2016-01-01/
- Kafka分topic , 用作實(shí)時(shí)分析。
畫圖
準(zhǔn)備
- 啟動(dòng)zookeeper : /opt/zookeeper/bin/zkServer.sh start
- 啟動(dòng)kafka : nohup /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties >>/opt/logs/kafka-server.log 2>&1 &
- 啟動(dòng)hdfs : start-dfs.sh
- 這里查看一下xxo10的進(jìn)程 :
1 2 3 4 5 6
| [root@xxo10 flume]# jps 1305 Kafka 1252 QuorumPeerMain 1786 Jps 1542 DataNode 1454 NameNode
|
- 創(chuàng)建topic
1 2 3 4 5 6 7
| [root@xxo08 kafka]# bin/kafka-topics.sh --create --zookeeper xxo08:2181,xxo09:2181,xxo10:2181 --replication-factor 3 --partition 3 --topic access [root@xxo08 kafka]# bin/kafka-topics.sh --create --zookeeper xxo08:2181,xxo09:2181,xxo10:2181 --replication-factor 3 --partition 3 --topic ugchead [root@xxo08 kafka]# bin/kafka-topics.sh --create --zookeeper xxo08:2181,xxo09:2181,xxo10:2181 --replication-factor 3 --partition 3 --topic ugctail [root@xxo08 kafka]# bin/kafka-topics.sh --list --zookeeper xxo08:2181,xxo09:2181,xxo10:2181 ###查看 access ugchead ugctail
|
C機(jī)器1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| [root@xxo10 flume]# vim conf/hdfs_kafka.conf
#################################### C機(jī)器 ######################################### #################################### 兩個(gè)channel,、兩個(gè)sink ##########################
# Name the components on this agent a1.sources = r1 a1.sinks = kfk fs a1.channels = c1 c2
# varo source a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444
# source r1定義攔截器,為消息添加時(shí)間戳 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
# kfk sink a1.sinks.kfk.type = org.apache.flume.sink.kafka.KafkaSink #a1.sinks.kfk.topic = mytopic a1.sinks.kfk.brokerList = xxo08:9092,xxo09:9092,xxo10:9092
# hdfs sink a1.sinks.fs.type = hdfs a1.sinks.fs.hdfs.path = hdfs://xxo10:9000/source/%{type}/%Y%m%d a1.sinks.fs.hdfs.filePrefix = events- a1.sinks.fs.hdfs.fileType = DataStream #a1.sinks.fs.hdfs.fileType = CompressedStream #a1.sinks.fs.hdfs.codeC = gzip #不按照條數(shù)生成文件 a1.sinks.fs.hdfs.rollCount = 0 #如果壓縮存儲的話HDFS上的文件達(dá)到64M時(shí)生成一個(gè)文件注意是壓縮前大小為64生成一個(gè)文件,,然后壓縮存儲,。 a1.sinks.fs.hdfs.rollSize = 67108864 a1.sinks.fs.hdfs.rollInterval = 0
# Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 1000
a1.channels.c2.type = memory a1.channels.c2.capacity = 10000 a1.channels.c2.transactionCapacity = 1000
# Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.kfk.channel = c1 a1.sinks.fs.channel = c2
|
- 啟動(dòng)
1 2 3 4 5
| [root@xxo10 ~]# cd /opt/apache-flume/ [root@xxo10 flume]# bin/flume-ng agent --conf conf/ --conf-file conf/hdfs_kafka.conf --name a1 -Dflume.root.logger=INFO,console & ...... ...... ......Component type: SINK, name: kfk started ##啟動(dòng)成功
|
A機(jī)器1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| [root@xxo08 flume]# vim conf/hdfs_kafka.conf
#################################### A機(jī)器 ######################################### #################################### 3個(gè)source ##################################### #################################### 2個(gè)攔截器 ###################################### # Name the components on this agent a1.sources = access ugchead ugctail a1.sinks = k1 a1.channels = c1
# 三個(gè)sources a1.sources.access.type = exec a1.sources.access.command = tail -F /root/data/access.log
a1.sources.ugchead.type = exec a1.sources.ugchead.command = tail -F /root/data/ugchead.log
a1.sources.ugctail.type = exec a1.sources.ugctail.command = tail -F /root/data/ugctail.log
# sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = xxo10 a1.sinks.k1.port = 44444
# channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 1000
# interceptor a1.sources.access.interceptors = i1 i2 a1.sources.access.interceptors.i1.type=static a1.sources.access.interceptors.i1.key = type a1.sources.access.interceptors.i1.value = access a1.sources.access.interceptors.i2.type=static a1.sources.access.interceptors.i2.key = topic a1.sources.access.interceptors.i2.value = access
a1.sources.ugchead.interceptors = i1 i2 a1.sources.ugchead.interceptors.i1.type=static a1.sources.ugchead.interceptors.i1.key = type a1.sources.ugchead.interceptors.i1.value = ugchead a1.sources.ugchead.interceptors.i2.type=static a1.sources.ugchead.interceptors.i2.key = topic a1.sources.ugchead.interceptors.i2.value = ugchead
a1.sources.ugctail.interceptors = i1 i2 a1.sources.ugctail.interceptors.i1.type=static a1.sources.ugctail.interceptors.i1.key = type a1.sources.ugctail.interceptors.i1.value = ugctail a1.sources.ugctail.interceptors.i2.type=static a1.sources.ugctail.interceptors.i2.key = topic a1.sources.ugctail.interceptors.i2.value = ugctail
# Bind the source and sink to the channel a1.sources.access.channels = c1 a1.sources.ugchead.channels = c1 a1.sources.ugctail.channels = c1 a1.sinks.k1.channel = c1
|
- 啟動(dòng)A機(jī)器
1
| [root@xxo08 flume]# bin/flume-ng agent --conf conf/ --conf-file conf/hdfs_kafka.conf --name a1 -Dflume.root.logger=INFO,console &
|
驗(yàn)證功能
- 我這里啟動(dòng)了一個(gè)向
access.log 、ugcheader.log ,、ugctail.log 添加數(shù)據(jù)的java程序:1
| [root@xxo08 ~]# java -cp KafkaFlumeProject_20160519-1.0-SNAPSHOT-jar-with-dependencies.jar com.xxo.utils.Creator
|
查看hdfs的情況
1 2 3 4 5 6
| [root@xxo10 ~]# hdfs dfs -text /source/ugchead/20160523/* | more 1001 221.8.9.6 80 be83f3fd-a218-4f98-91d8-6b4f0bb4558b 750b6203-4a7d-42d5-82e4-906415b70f63 10207 {"ugctype":"consumer", "userId":"40604","coin":"10","number":"2"} 1463685721663 1003 218.75.100.114 ea11f1d2-680d-4645-a52e-74d5f2317dfd 8109eda1-aeac-43fe-94b1-85d2d1934913 20101 {"ugctype":"fav","user Id":"40604","item":"13"} 1463685722666 ......
|
kafka消費(fèi)者
1 2 3 4
| ########################## 這里查看一下access ############################### [root@xxo09 ~]# /opt/kafka/bin/kafka-console-consumer.sh --zookeeper xxo08:2181,xxo09:2181,xxo10:2181 --topic access --from-beginning 1001 218.26.219.186 070c8525-b857-414d-98b6-13134da08401 10201 0 GET /tologin HTTP/1.1 408 /update/pass Mozilla/5.0 (Windows; U; Windows NT 5.1)Gecko/20070803 Firefox/1.5.0.12 1463676319717 ......
|
查看日志: tac /opt/flume/logs/flume.log | more
同步節(jié)點(diǎn)
- 【B機(jī)器】
1 2 3 4 5 6
| ####################### 拷貝 ################################## [root@xxo08 flume]# scp /opt/flume/conf/hdfs_kafka.conf root@xxo09:/opt/flume/conf/ hdfs_kafka.conf 100% 1803 1.8KB/s 00:00
####################### 遠(yuǎn)程啟動(dòng) ############################### [root@xxo08 flume]# ssh root@xxo09 "/opt/flume/bin/flume-ng agent --conf /opt/flume/conf/ --conf-file /opt/flume/conf/hdfs_kafka.conf --name a1" &
|
|