introduction
Kafka為一分布式的系統
使用第三方基礎庫來操作Kafka
import "github.com/Shopify/sarama"
往kafka放東西的,稱為生產者(Producer)
客戶端連上Kafka,從Kafka取(消費)數據,稱為消費者
tailf庫
其為golang內部的基礎庫
其可以從一個不斷寫入的文件,持續的拿出數據
import "github.com/hpcloud/tail"
Sarama 往kafka發送數據實例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 package mainimport ( "fmt" "github.com/Shopify/sarama" ) func main () { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.Return.Successes = true client,err := sarama.NewSyncProducer([]string {"127.0.0.1:9092" },config) if err != nil { fmt.Println("producer.close, err:" ,err) return } defer client.Close() msg := &sarama.ProducerMessage{} msg.Topic = "nginx_log" msg.Value = sarama.StringEncoder("this is a test message" ) pid,offset,err := client.SendMessage(msg) if err != nil { fmt.Println("send message failed:" ,err) return } fmt.Printf("pid %v offset %v\n" ,pid,offset) }
透過第三方庫發消息給kafka時,sarma會等待接收ack包,當kafka收到消息並寫入文件時,會回發ack包給sarma,藉以確認消息未丟失 且已經存入kafka中
使用kafka-cli測試消費數據
使用kafka-console-consumer測試從終端消費數據1 kafka-console-consumer --bootstrap-server localhost:9092 --topic nginx_log --from-beginning
result 成功消費數據
tailf
其可在程序中去調用tailf,而非傳統Linux的開啟一個tail命令的進程
使用tail.TailFile(filename,config)
filename
: 欲被tail的文件
config
:其為tail.Config
的實例,為一個struct
Location
:記下文件最後的位置
Poll
:是否不斷地進行查詢
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package mainimport ( "fmt" "github.com/hpcloud/tail" "time" ) func main () { fileName := "./my.log" tails,err := tail.TailFile(fileName,tail.Config{ ReOpen:true , Follow:true , Location: &tail.SeekInfo{Offset:0 , Whence:2 }, MustExist:false , Poll:true , }) if err != nil { fmt.Println("tail file err" ,err) return } var msg *tail.Line var ok bool for { msg,ok = <- tails.Lines if !ok { fmt.Println("tail file close reopen,filename:%s\n" ,tails.Filename) time.Sleep(100 * time.Millisecond) continue } fmt.Println("msg:" ,msg) } } `` ` 1. 真實情況下須處理信號問題 ************************ # 配置文件(config)庫 * 其可支持多種格式的配置文件,例如yaml,ini等 * ` go get github.com/astaxie/beego/config` * 其為beego web框架其中一個基礎庫,可單獨使用 * ` import "github.com/astaxie/beego/config" ` * 暫時使用config庫來將配置寫到文件中,之後可以使用**etcd**在線web介面管理配,並實時生效 ## ini格式初識 * ini格式的配置文件分為**節、配置項**` (配置項名 = 配置項內容)`
[server] listen_ip = “0.0.0.0” listen_port = 8080
[logs] log_level= debug log_path = ./logs/logagent.log
[collect] log_path = /home/work/logs/nginx/access.log topic = nginx_log1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 ## 讀取配置文件 ```go package main import ( "fmt" "github.com/astaxie/beego/config" ) func main(){ //新增一個讀取配置文件實例 conf, err := config.NewConfig("ini","./logagent.conf") //[1] if err != nil{ fmt.Println("new config failed:",err) return } port,err := conf.Int("server::listen_port") fmt.Println("port :",port) log_level := conf.String("logs::log_level") fmt.Println("log_Level :",log_level) }
NewConfig(adapterName,fileName)
adapterName
:欲讀取配置文件的格式
fileName
:欲讀取配置文件名
日誌(log)庫
配置日誌庫logs.SetLogger(AdapterFile,config)
的config參數為一json串
通常會使用json Marshal函數進行配置(較安全),而非自己構造json字串
go get github.com/astaxie/beego/logs
import "github.com/astaxie/beego/logs"
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package mainimport ( "encoding/json" "fmt" "github.com/astaxie/beego/logs" ) func main () { config := make (map [string ]interface {}) config["filename" ] = "./my.log" config["level" ] = logs.LevelDebug configStr,err := json.Marshal(config) if err != nil { fmt.Println("Config Marshal Failed:" ,err) return } err = logs.SetLogger(logs.AdapterFile,string (configStr)) if err != nil { fmt.Println("SetLogger Failed:" ,err) } logs.Debug("this is a test my name is %s" ,"stu01" ) logs.Trace("this is a trace my name is %s" ,"stu02" ) logs.Warn("this is a warn, my name is %s" ,"stu03" ) }