在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
package nsqd import ( "bytes" "encoding/json" "net" "os" "strconv" "time" "github.com/nsqio/go-nsq" "github.com/nsqio/nsq/internal/version" )
func connectCallback(n *NSQD, hostname string, syncTopicChan chan *lookupPeer) func(*lookupPeer) { return func(lp *lookupPeer) { ci := make(map[string]interface{}) ci["version"] = version.Binary ci["tcp_port"] = n.RealTCPAddr().Port ci["http_port"] = n.RealHTTPAddr().Port ci["hostname"] = hostname ci["broadcast_address"] = n.getOpts().BroadcastAddress cmd, err := nsq.Identify(ci) if err != nil { lp.Close() return } resp, err := lp.Command(cmd) if err != nil { n.logf("LOOKUPD(%s): ERROR %s - %s", lp, cmd, err) } else if bytes.Equal(resp, []byte("E_INVALID")) { n.logf("LOOKUPD(%s): lookupd returned %s", lp, resp) } else { err = json.Unmarshal(resp, &lp.Info) if err != nil { n.logf("LOOKUPD(%s): ERROR parsing response - %s", lp, resp) } else { n.logf("LOOKUPD(%s): peer info %+v", lp, lp.Info) } } go func() { syncTopicChan <- lp }() } }
func (n *NSQD) lookupLoop() { var lookupPeers []*lookupPeer var lookupAddrs []string syncTopicChan := make(chan *lookupPeer) connect := true hostname, err := os.Hostname() if err != nil { n.logf("FATAL: failed to get hostname - %s", err) os.Exit(1) } // for announcements, lookupd determines the host automatically ticker := time.Tick(15 * time.Second) for { if connect { for _, host := range n.getOpts().NSQLookupdTCPAddresses { if in(host, lookupAddrs) { continue } n.logf("LOOKUP(%s): adding peer", host) lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.getOpts().Logger, connectCallback(n, hostname, syncTopicChan)) lookupPeer.Command(nil) // start the connection lookupPeers = append(lookupPeers, lookupPeer) lookupAddrs = append(lookupAddrs, host) } n.lookupPeers.Store(lookupPeers) connect = false } select { case <-ticker: // send a heartbeat and read a response (read detects closed conns) for _, lookupPeer := range lookupPeers { n.logf("LOOKUPD(%s): sending heartbeat", lookupPeer) cmd := nsq.Ping() _, err := lookupPeer.Command(cmd) if err != nil { n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err) } } case val := <-n.notifyChan: var cmd *nsq.Command var branch string switch val.(type) { case *Channel: // notify all nsqlookupds that a new channel exists, or that it's removed branch = "channel" channel := val.(*Channel) if channel.Exiting() == true { cmd = nsq.UnRegister(channel.topicName, channel.name) } else { cmd = nsq.Register(channel.topicName, channel.name) } case *Topic: // notify all nsqlookupds that a new topic exists, or that it's removed branch = "topic" topic := val.(*Topic) if topic.Exiting() == true { cmd = nsq.UnRegister(topic.name, "") } else { cmd = nsq.Register(topic.name, "") } } for _, lookupPeer := range lookupPeers { n.logf("LOOKUPD(%s): %s %s", lookupPeer, branch, cmd) _, err := lookupPeer.Command(cmd) if err != nil { n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err) } } case lookupPeer := <-syncTopicChan: var commands []*nsq.Command // build all the commands first so we exit the lock(s) as fast as possible n.RLock() for _, topic := range n.topicMap { topic.RLock() if len(topic.channelMap) == 0 { commands = append(commands, nsq.Register(topic.name, "")) } else { for _, channel := range topic.channelMap { commands = append(commands, nsq.Register(channel.topicName, channel.name)) } } topic.RUnlock() } n.RUnlock() for _, cmd := range commands { n.logf("LOOKUPD(%s): %s", lookupPeer, cmd)
|
请发表评论