在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
本节主要内容: 1. 日志收集系统设计 1. 项目背景 2. 解决方案 3. 面临的问题 4. 业界方案ELK 日志收集系统架构 该方案问题: a. 运维成本高,每增加一个日志收集,都需要手动修改配置 6. 日志收集系统设计
各组件介绍: 7. kafka应用场景
2. 应用解耦,通过消息队列 3. 流量削峰3. 流量削峰
8. zookeeper应用场景 1. 服务注册&服务发现
2. 配置中心 3. 分布式锁
9. 安装kafka 见博客:https://www.cnblogs.com/xuejiale/p/10505391.html 10. log agent设计 11. log agent流程
11. kafka示例 先导入第三方包: github.com/Shopify/sarama 我的kafka和ZooKeeper都安装在Linux(Centos6.5,ip: 192.168.30.136)上: 1 package main 2 3 import ( 4 "fmt" 5 "time" 6 "github.com/Shopify/sarama" 7 ) 8 9 func main() { 10 11 config := sarama.NewConfig() 12 config.Producer.RequiredAcks = sarama.WaitForAll 13 config.Producer.Partitioner = sarama.NewRandomPartitioner 14 config.Producer.Return.Successes = true 15 16 client, err := sarama.NewSyncProducer([]string{"192.168.30.136:9092"}, config) 17 if err != nil { 18 fmt.Println("producer close, err:", err) 19 return 20 } 21 22 defer client.Close() 23 for { 24 msg := &sarama.ProducerMessage{} 25 msg.Topic = "nginx_log" 26 msg.Value = sarama.StringEncoder("this is a good test, my message is good") 27 28 pid, offset, err := client.SendMessage(msg) 29 if err != nil { 30 fmt.Println("send message failed,", err) 31 return 32 } 33 34 fmt.Printf("pid:%v offset:%v\n", pid, offset) 35 time.Sleep(time.Second) 36 } 37 } 注意:Shopify/sarama的同步/异步producer,https://www.jianshu.com/p/666d2604e8f8 Windows启动程序往Linux上的kafka发送数据: Linux上的kafka接收数据: 再来看一个kafka生产和消费示例: 1 package main 2 3 import ( 4 "fmt" 5 "github.com/Shopify/sarama" 6 ) 7 8 func main() { 9 // 新建一个arama配置实例 10 config := sarama.NewConfig() 11 // WaitForAll waits for all in-sync replicas to commit before responding. 12 config.Producer.RequiredAcks = sarama.WaitForAll 13 // NewRandomPartitioner returns a Partitioner which chooses a random partition each time. 14 config.Producer.Partitioner = sarama.NewRandomPartitioner 15 config.Producer.Return.Successes = true 16 17 // new producer 18 client, err := sarama.NewSyncProducer([]string{"192.168.30.136:9092"}, config) 19 if err != nil { 20 fmt.Println("producer close, err:", err) 21 return 22 } 23 defer client.Close() 24 25 // new message 26 msg := &sarama.ProducerMessage{} 27 msg.Topic = "food" 28 msg.Key = sarama.StringEncoder("fruit") 29 msg.Value = sarama.StringEncoder("apple") 30 31 // send message 32 pid, offset, err := client.SendMessage(msg) 33 if err != nil { 34 fmt.Println("send message failed,", err) 35 return 36 } 37 fmt.Printf("pid: %v, offset:%v\n", pid, offset) 38 39 // new message 40 msg2 := &sarama.ProducerMessage{} 41 msg2.Topic = "food" 42 msg2.Key = sarama.StringEncoder("fruit") 43 msg2.Value = sarama.StringEncoder("orange") 44 45 // send message 46 pid2, offset2, err := client.SendMessage(msg2) 47 if err != nil { 48 fmt.Println("send message failed,", err) 49 return 50 } 51 fmt.Printf("pid2: %v, offset2:%v\n", pid2, offset2) 52 53 fmt.Println("Produce success.") 54 } 1 package main 2 3 import ( 4 "sync" 5 "github.com/Shopify/sarama" 6 "fmt" 7 ) 8 9 var wg sync.WaitGroup 10 11 func main() { 12 consumer, err := sarama.NewConsumer([]string{"192.168.30.136:9092"}, nil) 13 if err != nil { 14 fmt.Println("consumer connect error:", err) 15 return 16 } 17 fmt.Println("connnect success...") 18 defer consumer.Close() 19 20 partitions, err := consumer.Partitions("food") 21 if err != nil { 22 fmt.Println("geet partitions failed, err:", err) 23 return 24 } 25 26 for _, p := range partitions { 27 partitionConsumer, err := consumer.ConsumePartition("food", p, sarama.OffsetOldest) 28 if err != nil { 29 fmt.Println("partitionConsumer err:", err) 30 continue 31 } 32 wg.Add(1) 33 go func(){ 34 for m := range partitionConsumer.Messages() { 35 fmt.Printf("key: %s, text: %s, offset: %d\n", string(m.Key), string(m.Value), m.Offset) 36 } 37 wg.Done() 38 }() 39 } 40 wg.Wait() 41 42 fmt.Println("Consumer success.") 43 } 12. tailf组件使用 先导入第三方包: github.com/hpcloud/tail 1 package main 2 3 import ( 4 "fmt" 5 "github.com/hpcloud/tail" 6 "time" 7 ) 8 func main() { 9 filename := "F:\\Go\\project\\src\\go_dev\\logCollect\\tailf\\my.log" 10 tails, err := tail.TailFile(filename, tail.Config{ 11 ReOpen: true, 12 Follow: true, 13 //Location: &tail.SeekInfo{Offset: 0, Whence: 2}, 14 MustExist: false, 15 Poll: true, 16 }) 17 if err != nil { 18 fmt.Println("tail file err:", err) 19 return 20 } 21 var msg *tail.Line 22 var ok bool 23 for { 24 msg, ok = <-tails.Lines 25 if !ok { 26 fmt.Printf("tail file close reopen, filename:%s\n", tails.Filename) 27 time.Sleep(100 * time.Millisecond) 28 continue 29 } 30 fmt.Println("msg:", msg) 31 } 32 } my.log文件内容(unix格式): 在Windows上,当我的上面代码里日志文件(my.log)为Windows格式,代码执行结果如下: 当时用notepade++将文件格式转换为Unix格式,执行代码结果如下: 注意:最后一行必须有换行符,否则该行无法读取。 13. 配置文件库使用 先导入第三方包: github.com/astaxie/beego/config 1) 初始化配置库 iniconf, err := NewConfig("ini", "testini.conf") if err != nil { log.Fatal(err) } 2) 读取配置项 String(key string) string Int(key string) (int, error) Int64(key string) (int64, error) Bool(key string) (bool, error) Float(key string) (float64, error) 例如: iniconf.String("server::listen_ip") iniconf.Int("server::listen_port") [server] listen_ip = "0.0.0.0" listen_port = 8080 [logs] log_level=debug log_path=./logs/logagent.log [collect] log_path=/home/work/logs/nginx/access.log topic=nginx_log 1 package main 2 3 import ( 4 "fmt" 5 "github.com/astaxie/beego/config" 6 ) 7 8 func main() { 9 conf, err := config.NewConfig("ini", "./logcollect.conf") 10 if err != nil { 11 fmt.Println("new config failed, err:", err) 12 return 13 } 14 15 port, err := conf.Int("server::listen_port") 16 if err != nil { 17 fmt.Println("read server:port failed, err:", err) 18 return 19 } 20 21 fmt.Println("Port:", port) 22 log_level := conf.String("log::log_level") 23 if err != nil { 24 fmt.Println("read log_level failed, ", err) 25 return 26 } 27 fmt.Println("log_level:", log_level) 28 29 log_path := conf.String("log::log_path") 30 fmt.Println("log_path:", log_path) 31 } 配置文件内容: [server] listen_ip = "0.0.0.0" listen_port = 8080 [log] log_level=debug log_path=./logs/logagent.log [collect] log_path=/home/work/logs/nginx/access.log topic=nginx_log 执行结果: 14. 日志库的使用 先导入第三方包: github.com/astaxie/beego/logs 1) 配置log组件 config := make(map[string]interface{}) config["filename"] = "./logs/logcollect.log" config["level"] = logs.LevelDebug configStr, err := json.Marshal(config) if err != nil { fmt.Println("marshal failed, err:", err) return } 2) 初始化日志组件 logs.SetLogger(“file”, string(configStr))
1 package main 2 3 import ( 4 "encoding/json" 5 "fmt" 6 "github.com/astaxie/beego/logs" 7 ) 8 9 func main() { 10 config := make(map[string]interface{}) 11 config["filename"] = "./logcollect.log" 12 config["level"] = logs.LevelDebug 13 14 configStr, err := json.Marshal(config) 15 if err != nil { 16 fmt.Println("marshal failed, err:", err) 17 return 18 } 19 20 logs.SetLogger(logs.AdapterFile, string(configStr)) 21 22 logs.Debug("this is a test, my name is %s", "stu01") 23 logs.Trace("this is a trace, my name is %s", "stu02") 24 logs.Warn("this is a warn, my name is %s", "stu03") 25 } 15. 日志收集项目整体实现 开发环境为Windows系统,go version go1.12.1 windows/amd64, kafka_2.11-2.0.0,zookeeper-3.4.12。 先实现了一个demo,V1版本: (1)代码结构图
(2)代码地址见本人github:https://github.com/XJL635438451/logCollectProject/tree/master (3)如何运行 1)先安装 go, kafka,zookeeper; 2)先启动 zookeeper,然后启动kafka,下面是启动的命令; 启动ZK .\zkServer.cmd 启动kafka F:\Go\project\src\module\kafka_2.11-2.0.0>.\bin\windows\kafka-server-start.bat .\config\server.properties 创建topic F:\Go\project\src\module\kafka_2.11-2.0.0>.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkaTest 启动生产者: F:\Go\project\src\module\kafka_2.11-2.0.0>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic kafkaTest 启动消费者: F:\Go\project\src\module\kafka_2.11-2.0.0>.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic nginx_log --from-beginning 3)如果自己不行写日志文件,可以运行代码中的 writeLogTest/log.go,然后运行 main.exe (如果自己修改了代码还需要重新编译); 4)可以起一个kafka的consumer来查看日志是否写入到了kafka,方法就是上面的启动生产者命令,如果正常就可以看到日志一直在kafka中刷新。
|
请发表评论