在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
消息队列是架构级解耦方案,常用于流量削峰、应用解耦、异步处理 消息队列之NSQNSQ是目前比较流行的一个分布式的消息队列,本文主要介绍了NSQ及Go语言如何操作NSQ。 NSQ介绍NSQ是Go语言编写的一个开源的实时分布式内存消息队列,其性能十分优异。 NSQ的优势有以下优势: NSQ提倡分布式和分散的拓扑,没有单点故障,支持容错和高可用性,并提供可靠的消息交付保证 NSQ支持横向扩展,没有任何集中式代理。 NSQ易于配置和部署,并且内置了管理界面。 安装官方下载页面根据自己的平台下载并解压即可。 启动nsqd: nsqd -lookupd-tcp-address=127.0.0.1:4160 [nsqd] 2019/07/18 16:02:50.968403 INFO: nsqd v1.1.0 (built w/go1.10.3) [nsqd] 2019/07/18 16:02:51.013659 INFO: ID: 826 [nsqd] 2019/07/18 16:02:51.014577 INFO: NSQ: persisting topic/channel metadata to nsqd.dat [nsqd] 2019/07/18 16:02:51.035655 INFO: HTTP: listening on [::]:4151 [nsqd] 2019/07/18 16:02:51.035655 INFO: LOOKUP(127.0.0.1:4160): adding peer [nsqd] 2019/07/18 16:02:51.038262 INFO: LOOKUP connecting to 127.0.0.1:4160 [nsqd] 2019/07/18 16:02:51.035655 INFO: TCP: listening on [::]:4150 启动nsqd: nsqlookupd [nsqlookupd] 2019/07/18 15:59:34.219793 INFO: nsqlookupd v1.1.0 (built w/go1.10.3) [nsqlookupd] 2019/07/18 15:59:34.279192 INFO: TCP: listening on [::]:4160 [nsqlookupd] 2019/07/18 15:59:34.279192 INFO: HTTP: listening on [::]:4161 运行nsqadmin管理: nsqadmin -lookupd-http-address=127.0.0.1:4161 [nsqadmin] 2019/07/18 15:59:54.169512 INFO: nsqadmin v1.1.0 (built w/go1.10.3) [nsqadmin] 2019/07/18 15:59:54.213611 INFO: HTTP: listening on [::]:4171 创建topic http://127.0.0.1:4171/lookup topic_demo4 Go操作NSQ// nsq_producer/main.go package main import ( "bufio" "fmt" "os" "strings" "github.com/nsqio/go-nsq" ) // NSQ Producer Demo var producer *nsq.Producer // 初始化生产者 func initProducer(str string) (err error) { config := nsq.NewConfig() producer, err = nsq.NewProducer(str, config) if err != nil { fmt.Printf("create producer failed, err:%v\n", err) return err } return nil } func main() { nsqAddress := "127.0.0.1:4150" // nsqd的地址 err := initProducer(nsqAddress) if err != nil { fmt.Printf("init producer failed, err:%v\n", err) return } reader := bufio.NewReader(os.Stdin) // 从标准输入读取 for { data, err := reader.ReadString('\n') if err != nil { fmt.Printf("read string from stdin failed, err:%v\n", err) continue } data = strings.TrimSpace(data) // 去掉输入内容前后的空格 if strings.ToUpper(data) == "Q" { // 输入Q退出 break } // 向 'topic_demo' publish 数据 err = producer.Publish("topic_demo", []byte(data)) if err != nil { fmt.Printf("publish msg to nsq failed, err:%v\n", err) continue } } } // nsq_consumer/main.go package main import ( "fmt" "os" "os/signal" "syscall" "time" "github.com/nsqio/go-nsq" ) // NSQ Consumer Demo // MyHandler 是一个消费者类型 type MyHandler struct { Title string } // HandleMessage 是需要实现的处理消息的方法 func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) { fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body)) return } // 初始化消费者 func initConsumer(topic string, channel string, address string) (err error) { config := nsq.NewConfig() config.LookupdPollInterval = 15 * time.Second // 15秒查询一次有没有新的nsqd节点加入进来 c, err := nsq.NewConsumer(topic, channel, config) if err != nil { fmt.Printf("create consumer failed, err:%v\n", err) return } // 创建一个结构体对象 consumer := &MyHandler{ Title: "鸣人一号", } c.AddHandler(consumer) // if err := c.ConnectToNSQD(address); err != nil { // 直接连NSQD if err := c.ConnectToNSQLookupd(address); err != nil { // 通过lookupd查询 return err } return nil } func main() { err := initConsumer("topic_demo", "xxx", "127.0.0.1:4161") if err != nil { fmt.Printf("init consumer failed, err:%v\n", err) return } c := make(chan os.Signal) // 定义一个信号的通道 signal.Notify(c, syscall.SIGINT) // 转发键盘中断信号到c <-c // 阻塞 }
|
请发表评论