Kafka日志收集实现
- 使用github.com/Shopify/sarama连接kafka,并往其中写数据
- 使用github.com/hpcloud/tail读取日志文件
- 使用zookeeper做集群管理
- 使用ini做配置文件解析
关于Kafka原理及工作流程见https://blog.csdn.net/wzb_wzt/article/details/107367245
初始化kafka连接、以及往kafka发送数据的方法
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41package kafka import ( "fmt" "github.com/Shopify/sarama" ) //专门往kafka写日志的模块 //sarama v1.20之后的版本加入了zstd压缩算法,需要用到cgo,在Windows平台编译时会报错 var ( client sarama.SyncProducer ) // Init 初始化生产者 func Init(addrs []string)(err error){ config:=sarama.NewConfig() config.Producer.RequiredAcks=sarama.WaitForAll //ACK反馈机制,all(需要leader 和follow都确认了) config.Producer.Partitioner=sarama.NewRandomPartitioner//指定写往哪个分区 新选出一个partition config.Producer.Return.Successes=true//成功交付的信息将在success channel返回 //连接kafka client, err = sarama.NewSyncProducer(addrs, config) if err != nil { fmt.Printf("producer close the err is %v",err) return } //defer client.Close() //不需要关闭 日志每时都在产生,所以不需要关闭 return } func SendToKafka(topic,data string)(err error){ //构造一个消息 msg:=&sarama.ProducerMessage{} msg.Topic=topic msg.Value=sarama.StringEncoder(data) //发送消息 message, offset, err := client.SendMessage(msg) if err != nil { fmt.Println("send message failed err:",err) return err } fmt.Printf("pid:%v;offset:%vn",message,offset) return }
初始化日志文件,读取文件数据到通道
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32package taillog import ( "github.com/hpcloud/tail" ) var ( tailObj *tail.Tail ) //Init 打开日志文件初始化 func Init(fileName string)(err error){ config := tail.Config{ ReOpen: true,//重新打开 切换文件时,创建新文件 Follow: true,//跟随文件 Location: &tail.SeekInfo{Offset: 0, Whence: 2},//从文件哪个地方开始读 MustExist: false,//文件不存在不报错 Poll: true,// } tailObj, err = tail.TailFile(fileName, config) if err != nil { return } return } //ReadChan 循环读取文件 func ReadChan() <-chan *tail.Line{ return tailObj.Lines }
主函数调用
- 加载配置文件信息;采用ini来读取配置信息并与结构体绑定
- 初始化kafka连接
- 初始化日志文件
- 循环读取文件信息并写入Kafka
复制代码
1
2
3
4
5
6
7//配置文件 [kafka] addrs=127.0.0.1:9092 topic=web_log [taillog] path=./my.log
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17//配置文件的结构体 //整个配置文件 package config type AppConf struct { KafkaConf `ini:"kafka"` TailConf `ini:"taillog"` } //Kafka 配置文件结构体 type KafkaConf struct { Address string `ini:"addrs"` Topic string`ini:"topic"` } //Tail 配置文件结构体 type TailConf struct { Path string `ini:"path"` }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49package main import ( "fmt" "gopkg.in/ini.v1" "log" "logagent/config" "logagent/kafka" "logagent/taillog" "time" ) //logAgent 入口程序 var ( iniFile =new(config.AppConf) ) func run(){ //1读取文件 for{ select { //2发送到kafka case line:=<-taillog.ReadChan(): kafka.SendToKafka(iniFile.Topic,line.Text) default: time.Sleep(time.Second) } } } func main(){ //0.加载配置文件 err := ini.MapTo(iniFile, "./config/logagent.ini") if err != nil { log.Fatal(err) } //1.初始化kafka连接 err = kafka.Init([]string{iniFile.Address}) if err != nil { fmt.Println("init kafka failed ,err:",err) return } fmt.Println("init kafka success!!") //2.打开日志文件准备收集日志 err = taillog.Init(iniFile.Path) if err != nil { fmt.Println("init tail failed ,err:",err) return } fmt.Println("init tail success!!") run() }
最后
以上就是自由电脑最近收集整理的关于Kafka日志收集简单实现的全部内容,更多相关Kafka日志收集简单实现内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复