在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
日志统计系统的整体思路就是监控各个文件夹下的日志,实时获取日志写入内容并写入kafka队列,写入kafka队列可以在高并发时排队,而且达到了逻辑解耦合的目的。然后从kafka队列中读出数据,根据实际需求显示网页上或者控制台等。 前情提要上一节我们完成了如下目标 本节目标1 写代码从kafka中读取消息,保证kafka消息读写功能无误。 从kafka中读取消息func main(){ fmt.Println("consumer begin...") config := sarama.NewConfig() config.Consumer.Return.Errors = true wg :=sync.WaitGroup{} //创建消费者 consumer, err := sarama.NewConsumer([]string{"localhost:9092"},config) if err != nil { fmt.Println("consumer create failed, error is ", err.Error()) return } defer consumer.Close() //Partitions(topic):该方法返回了该topic的所有分区id partitionList, err := consumer.Partitions("test") if err != nil { fmt.Println("get consumer partitions failed") fmt.Println("error is ", err.Error()) return } for partition := range partitionList { //ConsumePartition方法根据主题, //分区和给定的偏移量创建创建了相应的分区消费者 //如果该分区消费者已经消费了该信息将会返回error //OffsetNewest消费最新数据 pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest) if err != nil { panic(err) } //异步关闭,保证数据落盘 defer pc.AsyncClose() wg.Add(1) go func(sarama.PartitionConsumer) { defer wg.Done() //Messages()该方法返回一个消费消息类型的只读通道,由代理产生 for msg := range pc.Messages() { fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic,msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) } }(pc) } wg.Wait() consumer.Close() }
这样我们启动zookeeper和kafka后,分别运行前文实现的向kafka中写入数据的代码,以及现在的从kafka中消费的代码,看到如下效果 实现文件监控实现文件监控,主要是在文件中有内容写入时,程序可以及时获取写入的内容,类似于Linux命令中的tailf -f 某个文件的功能。 logdir文件夹下的log.txt记录的是不断增加的日志文件 func main() { logrelative := `../logdir/log.txt` _, filename, _, _ := runtime.Caller(0) fmt.Println(filename) datapath := path.Join(path.Dir(filename), logrelative) fmt.Println(datapath) tailFile, err := tail.TailFile(datapath, tail.Config{ //文件被移除或被打包,需要重新打开 ReOpen: true, //实时跟踪 Follow: true, //如果程序出现异常,保存上次读取的位置,避免重新读取 Location: &tail.SeekInfo{Offset: 0, Whence: 2}, //支持文件不存在 MustExist: false, Poll: true, }) if err != nil { fmt.Println("tail file err:", err) return } for true { msg, ok := <-tailFile.Lines if !ok { fmt.Printf("tail file close reopen, filename: %s\n", tailFile.Filename) time.Sleep(100 * time.Millisecond) continue } //fmt.Println("msg:", msg) //只打印text fmt.Println("msg:", msg.Text) } } 为了测试监控的功能。我们实现向log.txt中每隔0.1s写入一行”Hello+时间戳”的日志。当写入20条内容后我们将log.txt备份重命名。 func writeLog(datapath string) { filew, err := os.OpenFile(datapath, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644) if err != nil { fmt.Println("open file error ", err.Error()) return } w := bufio.NewWriter(filew) for i := 0; i < 20; i++ { timeStr := time.Now().Format("2006-01-02 15:04:05") fmt.Fprintln(w, "Hello current time is "+timeStr) time.Sleep(time.Millisecond * 100) w.Flush() } logBak := time.Now().Format("20060102150405") + ".txt" logBak = path.Join(path.Dir(datapath), logBak) filew.Close() err = os.Rename(datapath, logBak) if err != nil { fmt.Println("Rename error ", err.Error()) return } } 然后我们实现main函数,调用三次writeLog,这样会产生三个备份文件 func main() { logrelative := `../logdir/log.txt` _, filename, _, _ := runtime.Caller(0) fmt.Println(filename) datapath := path.Join(path.Dir(filename), logrelative) for i := 0; i < 3; i++ { writeLog(datapath) } }
我们分别启动文件监控和文件写入程序,效果如下 总结目前我们已经完成了kafka消息读写,文件监控,动态写入和备份等功能,接下来我们实现项目的配置化和统筹代码。
|
请发表评论