• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

nsqd.go

原作者: [db:作者] 来自: [db:来源] 收藏 邀请
package nsqd

import (
    "crypto/tls"
    "crypto/x509"
    "encoding/json"
    "errors"
    "fmt"
    "io/ioutil"
    "math/rand"
    "net"
    "os"
    "path"
    "runtime"
    "strings"
    "sync"
    "sync/atomic"
    "time"

    "github.com/nsqio/nsq/internal/clusterinfo"
    "github.com/nsqio/nsq/internal/dirlock"
    "github.com/nsqio/nsq/internal/http_api"
    "github.com/nsqio/nsq/internal/protocol"
    "github.com/nsqio/nsq/internal/statsd"
    "github.com/nsqio/nsq/internal/util"
    "github.com/nsqio/nsq/internal/version"
)

const (
    TLSNotRequired = iota
    TLSRequiredExceptHTTP
    TLSRequired
)

type errStore struct {
    err error
}

type NSQD struct {
    // 64bit atomic vars need to be first for proper alignment on 32bit platforms
    clientIDSequence int64

    sync.RWMutex

    opts atomic.Value

    dl        *dirlock.DirLock
    isLoading int32
    errValue  atomic.Value
    startTime time.Time

    topicMap map[string]*Topic

    lookupPeers atomic.Value

    tcpListener   net.Listener
    httpListener  net.Listener
    httpsListener net.Listener
    tlsConfig     *tls.Config

    poolSize int

    idChan               chan MessageID
    notifyChan           chan interface{}
    optsNotificationChan chan struct{}
    exitChan             chan int
    waitGroup            util.WaitGroupWrapper

    ci *clusterinfo.ClusterInfo
}

func New(opts *Options) *NSQD {
    dataPath := opts.DataPath
    if opts.DataPath == "" {
        cwd, _ := os.Getwd()
        dataPath = cwd
    }

    n := &NSQD{
        startTime:            time.Now(),
        topicMap:             make(map[string]*Topic),
        idChan:               make(chan MessageID, 4096),
        exitChan:             make(chan int),
        notifyChan:           make(chan interface{}),
        optsNotificationChan: make(chan struct{}, 1),
        ci:                   clusterinfo.New(opts.Logger, http_api.NewClient(nil, opts.HTTPClientConnectTimeout, opts.HTTPClientRequestTimeout)),
        dl:                   dirlock.New(dataPath),
    }
    n.swapOpts(opts)
    n.errValue.Store(errStore{})

    err := n.dl.Lock()
    if err != nil {
        n.logf("FATAL: --data-path=%s in use (possibly by another instance of nsqd)", dataPath)
        os.Exit(1)
    }

    if opts.MaxDeflateLevel < 1 || opts.MaxDeflateLevel > 9 {
        n.logf("FATAL: --max-deflate-level must be [1,9]")
        os.Exit(1)
    }

    if opts.ID < 0 || opts.ID >= 1024 {
        n.logf("FATAL: --worker-id must be [0,1024)")
        os.Exit(1)
    }

    if opts.StatsdPrefix != "" {
        var port string
        _, port, err = net.SplitHostPort(opts.HTTPAddress)
        if err != nil {
            n.logf("ERROR: failed to parse HTTP address (%s) - %s", opts.HTTPAddress, err)
            os.Exit(1)
        }
        statsdHostKey := statsd.HostKey(net.JoinHostPort(opts.BroadcastAddress, port))
        prefixWithHost := strings.Replace(opts.StatsdPrefix, "%s", statsdHostKey, -1)
        if prefixWithHost[len(prefixWithHost)-1] != '.' {
            prefixWithHost += "."
        }
        opts.StatsdPrefix = prefixWithHost
    }

    if opts.TLSClientAuthPolicy != "" && opts.TLSRequired == TLSNotRequired {
        opts.TLSRequired = TLSRequired
    }

    tlsConfig, err := buildTLSConfig(opts)
    if err != nil {
        n.logf("FATAL: failed to build TLS config - %s", err)
        os.Exit(1)
    }
    if tlsConfig == nil && opts.TLSRequired != TLSNotRequired {
        n.logf("FATAL: cannot require TLS client connections without TLS key and cert")
        os.Exit(1)
    }
    n.tlsConfig = tlsConfig

    n.logf(version.String("nsqd"))
    n.logf("ID: %d", opts.ID)

    return n
}

func (n *NSQD) logf(f string, args ...interface{}) {
    if n.getOpts().Logger == nil {
        return
    }
    n.getOpts().Logger.Output(2, fmt.Sprintf(f, args...))
}

func (n *NSQD) getOpts() *Options {
    return n.opts.Load().(*Options)
}

func (n *NSQD) swapOpts(opts *Options) {
    n.opts.Store(opts)
}

func (n *NSQD) triggerOptsNotification() {
    select {
    case n.optsNotificationChan <- struct{}{}:
    default:
    }
}

func (n *NSQD) RealTCPAddr() *net.TCPAddr {
    n.RLock()
    defer n.RUnlock()
    return n.tcpListener.Addr().(*net.TCPAddr)
}

func (n *NSQD) RealHTTPAddr() *net.TCPAddr {
    n.RLock()
    defer n.RUnlock()
    return n.httpListener.Addr().(*net.TCPAddr)
}

func (n *NSQD) RealHTTPSAddr() *net.TCPAddr {
    n.RLock()
    defer n.RUnlock()
    return n.httpsListener.Addr().(*net.TCPAddr)
}

func (n *NSQD) SetHealth(err error) {
    n.errValue.Store(errStore{err: err})
}

func (n *NSQD) IsHealthy() bool {
    return n.GetError() == nil
}

func (n *NSQD) GetError() error {
    errValue := n.errValue.Load()
    return errValue.(errStore 
                       
                    
                    

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
go语言开发中GOPATH问题与go语言linux开发环境教程发布时间:2022-07-10
下一篇:
Go语言系列之标准库os发布时间:2022-07-10
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap