在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
package nsqd import ( "crypto/tls" "crypto/x509" "encoding/json" "errors" "fmt" "io/ioutil" "math/rand" "net" "os" "path" "runtime" "strings" "sync" "sync/atomic" "time" "github.com/nsqio/nsq/internal/clusterinfo" "github.com/nsqio/nsq/internal/dirlock" "github.com/nsqio/nsq/internal/http_api" "github.com/nsqio/nsq/internal/protocol" "github.com/nsqio/nsq/internal/statsd" "github.com/nsqio/nsq/internal/util" "github.com/nsqio/nsq/internal/version" )
const ( TLSNotRequired = iota TLSRequiredExceptHTTP
TLSRequired
)
type errStore struct { err error }
type NSQD struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms clientIDSequence int64 sync.RWMutex opts atomic.Value dl *dirlock.DirLock isLoading int32 errValue atomic.Value startTime time.Time topicMap map[string]*Topic lookupPeers atomic.Value tcpListener net.Listener httpListener net.Listener httpsListener net.Listener tlsConfig *tls.Config poolSize int idChan chan MessageID notifyChan chan interface{} optsNotificationChan chan struct{} exitChan chan int waitGroup util.WaitGroupWrapper ci *clusterinfo.ClusterInfo }
func New(opts *Options) *NSQD { dataPath := opts.DataPath if opts.DataPath == "" { cwd, _ := os.Getwd() dataPath = cwd } n := &NSQD{ startTime: time.Now(), topicMap: make(map[string]*Topic), idChan: make(chan MessageID, 4096), exitChan: make(chan int), notifyChan: make(chan interface{}), optsNotificationChan: make(chan struct{}, 1), ci: clusterinfo.New(opts.Logger, http_api.NewClient(nil, opts.HTTPClientConnectTimeout, opts.HTTPClientRequestTimeout)), dl: dirlock.New(dataPath), } n.swapOpts(opts) n.errValue.Store(errStore{}) err := n.dl.Lock() if err != nil { n.logf("FATAL: --data-path=%s in use (possibly by another instance of nsqd)", dataPath) os.Exit(1) } if opts.MaxDeflateLevel < 1 || opts.MaxDeflateLevel > 9 { n.logf("FATAL: --max-deflate-level must be [1,9]") os.Exit(1) } if opts.ID < 0 || opts.ID >= 1024 { n.logf("FATAL: --worker-id must be [0,1024)") os.Exit(1) } if opts.StatsdPrefix != "" { var port string _, port, err = net.SplitHostPort(opts.HTTPAddress) if err != nil { n.logf("ERROR: failed to parse HTTP address (%s) - %s", opts.HTTPAddress, err) os.Exit(1) } statsdHostKey := statsd.HostKey(net.JoinHostPort(opts.BroadcastAddress, port)) prefixWithHost := strings.Replace(opts.StatsdPrefix, "%s", statsdHostKey, -1) if prefixWithHost[len(prefixWithHost)-1] != '.' { prefixWithHost += "." } opts.StatsdPrefix = prefixWithHost } if opts.TLSClientAuthPolicy != "" && opts.TLSRequired == TLSNotRequired { opts.TLSRequired = TLSRequired } tlsConfig, err := buildTLSConfig(opts) if err != nil { n.logf("FATAL: failed to build TLS config - %s", err) os.Exit(1) } if tlsConfig == nil && opts.TLSRequired != TLSNotRequired { n.logf("FATAL: cannot require TLS client connections without TLS key and cert") os.Exit(1) } n.tlsConfig = tlsConfig n.logf(version.String("nsqd")) n.logf("ID: %d", opts.ID) return n }
func (n *NSQD) logf(f string, args ...interface{}) { if n.getOpts().Logger == nil { return } n.getOpts().Logger.Output(2, fmt.Sprintf(f, args...)) }
func (n *NSQD) getOpts() *Options { return n.opts.Load().(*Options) }
func (n *NSQD) swapOpts(opts *Options) { n.opts.Store(opts) }
func (n *NSQD) triggerOptsNotification() { select { case n.optsNotificationChan <- struct{}{}: default: } }
func (n *NSQD) RealTCPAddr() *net.TCPAddr { n.RLock() defer n.RUnlock() return n.tcpListener.Addr().(*net.TCPAddr) }
func (n *NSQD) RealHTTPAddr() *net.TCPAddr { n.RLock() defer n.RUnlock() return n.httpListener.Addr().(*net.TCPAddr) }
func (n *NSQD) RealHTTPSAddr() *net.TCPAddr { n.RLock() defer n.RUnlock() return n.httpsListener.Addr().(*net.TCPAddr) }
func (n *NSQD) SetHealth(err error) { n.errValue.Store(errStore{err: err}) }
func (n *NSQD) IsHealthy() bool { return n.GetError() == nil }
func (n *NSQD) GetError() error { errValue := n.errValue.Load() return errValue.(errStore |
请发表评论