51Reboot 將在 2020.1.16日 21:00 為您帶來分享主題《大佬教你如何從 ES 初學(xué)者到 ES專家》
直播鏈接(提前報(bào)名):https://ke.qq.com/course/482014?taid=4309905192737502&tuin=31589b0e
第一篇文章主要是關(guān)于整體架構(gòu)以及用到的軟件的一些介紹,,這一篇文章是對(duì)各個(gè)軟件的使用介紹,當(dāng)然這里主要是關(guān)于架構(gòu)中我們 agent 的實(shí)現(xiàn)用到的內(nèi)容
關(guān)于 zookeeper+kafka
我們需要先把兩者啟動(dòng),,先啟動(dòng) zookeeper,再啟動(dòng) kafka
啟動(dòng) ZooKeeper:./bin/zkServer.sh start
啟動(dòng) kafka:./bin/kafka-server-start.sh ./config/server.properties
操作 kafka 需要安裝一個(gè)包:go get github.com/Shopify/sarama
寫一個(gè)簡單的代碼,,通過 go 調(diào)用往 kafka 里扔數(shù)據(jù):
package main
import (
"github.com/Shopify/sarama"
"fmt"
)
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
msg := &sarama.ProducerMessage{}
msg.Topic = "nginx_log"
msg.Value = sarama.StringEncoder("this is a good test,my message is zhaofan")
client,err := sarama.NewSyncProducer([]string{"192.168.0.118:9092"},config)
if err != nil{
fmt.Println("producer close err:",err)
return
}
defer client.Close()
pid,offset,err := client.SendMessage(msg)
if err != nil{
fmt.Println("send message failed,",err)
return
}
fmt.Printf("pid:%v offset:%v\n",pid,offset)
}
config.Producer.RequiredAcks = sarama.WaitForAll 這里表示是在給kafka扔數(shù)據(jù)的時(shí)候是否需要確認(rèn)收到kafka的ack消息
msg.Topic = "nginx_log" 因?yàn)閗afka是一個(gè)分布式系統(tǒng),,假如我們要讀的是nginx日志,apache日志,,我們可以根據(jù)topic做區(qū)分,,同時(shí)也是我們也可以有不同的分區(qū)
我們將上述代碼執(zhí)行一下,,就會(huì)往kafka中扔一條消息,可以通過kakfa中自帶的消費(fèi)者命令查看:
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic nginx_log --from-beginning
我們可以將最后的代碼稍微更改一下,,更改為循環(huán)發(fā)送:
for{
pid,offset,err := client.SendMessage(msg)
if err != nil{
fmt.Println("send message failed,",err)
return
}
fmt.Printf("pid:%v offset:%v\n",pid,offset)
time.Sleep(2*time.Second)
}
這樣當(dāng)我們再次執(zhí)行的程序的時(shí)候,,我們可以看到客戶端在不停的消費(fèi)到數(shù)據(jù):
這樣我們就實(shí)現(xiàn)一個(gè)kakfa的生產(chǎn)者的簡單的demo
接下來我們還需要知道一個(gè)工具的使用tailf
tailf
我們的agent需要讀日志目錄下的日志文件,而日志文件是不停的增加并且切換文件的,,所以我們就需要借助于tailf這個(gè)包來讀文件,,當(dāng)然這里的tailf和linux里的tail -f命令雖然不同,但是效果是差不多的,,都是為了獲取日志文件新增加的內(nèi)容,。
而我們的客戶端非常重要的一個(gè)地方就是要讀日志文件并且將讀到的日志文件推送到kafka
這里需要我們下載一個(gè)包:go get github.com/hpcloud/tail
我們通過下面一個(gè)例子對(duì)這個(gè)包進(jìn)行一個(gè)基本的使用,更詳細(xì)的api說明看:https:///github.com/hpcloud/tail
package main
import (
"github.com/hpcloud/tail"
"fmt"
"time"
)
func main() {
filename := "/Users/zhaofan/go_project/src/go_dev/13/tailf/my.log"
tails,err := tail.TailFile(filename,tail.Config{
ReOpen:true,
Follow:true,
Location:&tail.SeekInfo{Offset:0,Whence:2},
MustExist:false,
Poll:true,
})
if err !=nil{
fmt.Println("tail file err:",err)
return
}
var msg *tail.Line
var ok bool
for true{
msg,ok = <-tails.Lines
if !ok{
fmt.Printf("tail file close reopen,filenam:%s\n",tails,filename)
time.Sleep(100*time.Millisecond)
continue
}
fmt.Println("msg:",msg.Text)
}
}
最終實(shí)現(xiàn)的效果是當(dāng)你文件里面添加內(nèi)容后,,就可以不斷的讀取文件中的內(nèi)容
日志庫的使用
這里是通過beego的日志庫實(shí)現(xiàn)的,,beego的日志庫是可以單獨(dú)拿出來用的,還是非常方便的,,使用例子如下:
package main
import (
"github.com/astaxie/beego/logs"
"encoding/json"
"fmt"
)
func main() {
config := make(map[string]interface{})
config["filename"] = "/Users/zhaofan/go_project/src/go_dev/13/log/logcollect.log"
config["level"] = logs.LevelTrace
configStr,err := json.Marshal(config)
if err != nil{
fmt.Println("marshal failed,err:",err)
return
}
logs.SetLogger(logs.AdapterFile,string(configStr))
logs.Debug("this is a debug,my name is %s","stu01")
logs.Info("this is a info,my name is %s","stu02")
logs.Trace("this is trace my name is %s","stu03")
logs.Warn("this is a warn my name is %s","stu04")
}
簡單版本logagent的實(shí)現(xiàn)
這里主要是先實(shí)現(xiàn)核心的功能,,后續(xù)再做優(yōu)化和改進(jìn),主要實(shí)現(xiàn)能夠根據(jù)配置文件中配置的日志路徑去讀取日志并將讀取的實(shí)時(shí)推送到kafka消息隊(duì)列中
關(guān)于logagent的主要結(jié)構(gòu)如下:
程序目錄結(jié)構(gòu)為:
├── conf
│ └── app.conf
├── config.go
├── kafka.go
├── logs
│ └── logcollect.log
├── main.go
└── server.go
app.conf :配置文件
config.go:用于初始化讀取配置文件中的內(nèi)容,,這里的配置文件加載是通過之前自己實(shí)現(xiàn)的配置文件熱加載包處理的,,博客地址:http://www.cnblogs.com/zhaof/p/8593204.html
logcollect.log:日志文件
kafka.go:對(duì)kafka的操作,包括初始化kafka連接,,以及給kafka發(fā)送消息
server.go:主要是tail 的相關(guān)操作,,用于去讀日志文件并將內(nèi)容放到channel中
所以這里我們主要的代碼邏輯或者重要的代碼邏輯就是kafka.go 以及server.go
kafka.go代碼內(nèi)容為:
// 這里主要是kafak的相關(guān)操作,包括了kafka的初始化,,以及發(fā)送消息的操作
package main
import (
"github.com/Shopify/sarama"
"github.com/astaxie/beego/logs"
)
var (
client sarama.SyncProducer
kafkaSender *KafkaSender
)
type KafkaSender struct {
client sarama.SyncProducer
lineChan chan string
}
// 初始化kafka
func NewKafkaSender(kafkaAddr string)(kafka *KafkaSender,err error){
kafka = &KafkaSender{
lineChan:make(chan string,100000),
}
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
client,err := sarama.NewSyncProducer([]string{kafkaAddr},config)
if err != nil{
logs.Error("init kafka client failed,err:%v\n",err)
return
}
kafka.client = client
for i:=0;i<appConfig.KafkaThreadNum;i++{
// 根據(jù)配置文件循環(huán)開啟線程去發(fā)消息到kafka
go kafka.sendToKafka()
}
return
}
func initKafka()(err error){
kafkaSender,err = NewKafkaSender(appConfig.kafkaAddr)
return
}
func (k *KafkaSender) sendToKafka(){
//從channel中讀取日志內(nèi)容放到kafka消息隊(duì)列中
for v := range k.lineChan{
msg := &sarama.ProducerMessage{}
msg.Topic = "nginx_log"
msg.Value = sarama.StringEncoder(v)
_,_,err := k.client.SendMessage(msg)
if err != nil{
logs.Error("send message to kafka failed,err:%v",err)
}
}
}
func (k *KafkaSender) addMessage(line string)(err error){
//我們通過tailf讀取的日志文件內(nèi)容先放到channel里面
k.lineChan <- line
return
}
server.go的代碼為:
package main
import (
"github.com/hpcloud/tail"
"fmt"
"sync"
"github.com/astaxie/beego/logs"
"strings"
)
type TailMgr struct {
//因?yàn)槲覀兊腶gent可能是讀取多個(gè)日志文件,,這里通過存儲(chǔ)為一個(gè)map
tailObjMap map[string]*TailObj
lock sync.Mutex
}
type TailObj struct {
//這里是每個(gè)讀取日志文件的對(duì)象
tail *tail.Tail
offset int64 //記錄當(dāng)前位置
filename string
}
var tailMgr *TailMgr
var waitGroup sync.WaitGroup
func NewTailMgr()(*TailMgr){
tailMgr = &TailMgr{
tailObjMap:make(map[string]*TailObj,16),
}
return tailMgr
}
func (t *TailMgr) AddLogFile(filename string)(err error){
t.lock.Lock()
defer t.lock.Unlock()
_,ok := t.tailObjMap[filename]
if ok{
err = fmt.Errorf("duplicate filename:%s\n",filename)
return
}
tail,err := tail.TailFile(filename,tail.Config{
ReOpen:true,
Follow:true,
Location:&tail.SeekInfo{Offset:0,Whence:2},
MustExist:false,
Poll:true,
})
tailobj := &TailObj{
filename:filename,
offset:0,
tail:tail,
}
t.tailObjMap[filename] = tailobj
return
}
func (t *TailMgr) Process(){
//開啟線程去讀日志文件
for _, tailObj := range t.tailObjMap{
waitGroup.Add(1)
go tailObj.readLog()
}
}
func (t *TailObj) readLog(){
//讀取每行日志內(nèi)容
for line := range t.tail.Lines{
if line.Err != nil {
logs.Error("read line failed,err:%v",line.Err)
continue
}
str := strings.TrimSpace(line.Text)
if len(str)==0 || str[0] == '\n'{
continue
}
kafkaSender.addMessage(line.Text)
}
waitGroup.Done()
}
func RunServer(){
tailMgr = NewTailMgr()
// 這一部分是要調(diào)用tailf讀日志文件推送到kafka中
for _, filename := range appConfig.LogFiles{
err := tailMgr.AddLogFile(filename)
if err != nil{
logs.Error("add log file failed,err:%v",err)
continue
}
}
tailMgr.Process()
waitGroup.Wait()
}
可以整體演示一下代碼實(shí)現(xiàn)的效果,當(dāng)我們運(yùn)行程序之后我配置文件配置的目錄為:
log_files=/app/log/a.log,/Users/zhaofan/a.log
我通過一個(gè)簡單的代碼對(duì)對(duì)a.log循環(huán)追加內(nèi)容,你可以從kafka的客戶端消費(fèi)力看到內(nèi)容了:
完成的代碼地址:https://github.com/pythonsite/logagent
小結(jié)
這次只是實(shí)現(xiàn)logagent的核心功能,,實(shí)現(xiàn)了從日志文件中通過多個(gè)線程獲取要讀的日志內(nèi)容,,這里借助了tailf,并將獲取的內(nèi)容放到channel中,,kafka.go會(huì)從channel中取出日志內(nèi)容并放到kafka的消息隊(duì)列中
這里并沒有做很多細(xì)致的處理,,下一篇文章會(huì)在這個(gè)代碼的基礎(chǔ)上進(jìn)行改進(jìn)。同時(shí)現(xiàn)在的配置文件的方式也不是最佳的,,每次改動(dòng)配置文件都需要重新啟動(dòng)程序,,后面將通過etcd的方式。
作者:coder
原文鏈接:https://www.cnblogs.com/zhaof/p/8673420.html
|