本文整理汇总了Golang中github.com/hashicorp/raft.PeerContained函数的典型用法代码示例。如果您正苦于以下问题:Golang PeerContained函数的具体用法?Golang PeerContained怎么用?Golang PeerContained使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了PeerContained函数的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。
示例1: handleJoinRequest
// handleJoinRequest handles a request to join the cluster
func (r *rpc) handleJoinRequest(req *internal.JoinRequest) (*internal.JoinResponse, error) {
r.traceCluster("join request from: %v", *req.Addr)
node, err := func() (*NodeInfo, error) {
// attempt to create the node
node, err := r.store.CreateNode(*req.Addr)
// if it exists, return the existing node
if err == ErrNodeExists {
node, err = r.store.NodeByHost(*req.Addr)
if err != nil {
return node, err
}
r.logger.Printf("existing node re-joined: id=%v addr=%v", node.ID, node.Host)
} else if err != nil {
return nil, fmt.Errorf("create node: %v", err)
}
peers, err := r.store.Peers()
if err != nil {
return nil, fmt.Errorf("list peers: %v", err)
}
// If we have less than 3 nodes, add them as raft peers if they are not
// already a peer
if len(peers) < MaxRaftNodes && !raft.PeerContained(peers, *req.Addr) {
r.logger.Printf("adding new raft peer: nodeId=%v addr=%v", node.ID, *req.Addr)
if err = r.store.AddPeer(*req.Addr); err != nil {
return node, fmt.Errorf("add peer: %v", err)
}
}
return node, err
}()
nodeID := uint64(0)
if node != nil {
nodeID = node.ID
}
if err != nil {
return nil, err
}
// get the current raft peers
peers, err := r.store.Peers()
if err != nil {
return nil, fmt.Errorf("list peers: %v", err)
}
return &internal.JoinResponse{
Header: &internal.ResponseHeader{
OK: proto.Bool(true),
},
EnableRaft: proto.Bool(raft.PeerContained(peers, *req.Addr)),
RaftNodes: peers,
NodeID: proto.Uint64(nodeID),
}, err
}
开发者ID:kfitzpatrick,项目名称:influxdb,代码行数:60,代码来源:rpc.go
示例2: open
func (r *localRaft) open() error {
s := r.store
// Setup raft configuration.
config := raft.DefaultConfig()
config.Logger = s.Logger
config.HeartbeatTimeout = s.HeartbeatTimeout
config.ElectionTimeout = s.ElectionTimeout
config.LeaderLeaseTimeout = s.LeaderLeaseTimeout
config.CommitTimeout = s.CommitTimeout
// If no peers are set in the config or there is one and we are it, then start as a single server.
config.EnableSingleNode = (len(s.peers) == 0) || len(s.peers) == 1 && raft.PeerContained(s.peers, s.Addr.String())
// Build raft layer to multiplex listener.
r.raftLayer = newRaftLayer(s.RaftListener, s.Addr)
// Create a transport layer
r.transport = raft.NewNetworkTransport(r.raftLayer, 3, 10*time.Second, os.Stderr)
// Create peer storage.
r.peerStore = raft.NewJSONPeers(s.path, r.transport)
// Create the log store and stable store.
store, err := raftboltdb.NewBoltStore(filepath.Join(s.path, "raft.db"))
if err != nil {
return fmt.Errorf("new bolt store: %s", err)
}
r.raftStore = store
// Create the snapshot store.
snapshots, err := raft.NewFileSnapshotStore(s.path, raftSnapshotsRetained, os.Stderr)
if err != nil {
return fmt.Errorf("file snapshot store: %s", err)
}
// Create raft log.
ra, err := raft.NewRaft(config, (*storeFSM)(s), store, store, snapshots, r.peerStore, r.transport)
if err != nil {
return fmt.Errorf("new raft: %s", err)
}
r.raft = ra
return nil
}
开发者ID:marcosnils,项目名称:influxdb,代码行数:44,代码来源:state.go
示例3: setupRaft
// setupRaft is used to setup and initialize Raft
func (s *Server) setupRaft() error {
// If we are in bootstrap mode, enable a single node cluster
if s.config.Bootstrap || (s.config.DevMode && !s.config.DevDisableBootstrap) {
s.config.RaftConfig.EnableSingleNode = true
}
// Create the FSM
var err error
s.fsm, err = NewFSM(s.evalBroker, s.periodicDispatcher, s.config.LogOutput)
if err != nil {
return err
}
// Create a transport layer
trans := raft.NewNetworkTransport(s.raftLayer, 3, s.config.RaftTimeout,
s.config.LogOutput)
s.raftTransport = trans
// Create the backend raft store for logs and stable storage
var log raft.LogStore
var stable raft.StableStore
var snap raft.SnapshotStore
var peers raft.PeerStore
if s.config.DevMode {
store := raft.NewInmemStore()
s.raftInmem = store
stable = store
log = store
snap = raft.NewDiscardSnapshotStore()
peers = &raft.StaticPeers{}
s.raftPeers = peers
} else {
// Create the base raft path
path := filepath.Join(s.config.DataDir, raftState)
if err := ensurePath(path, true); err != nil {
return err
}
// Create the BoltDB backend
store, err := raftboltdb.NewBoltStore(filepath.Join(path, "raft.db"))
if err != nil {
return err
}
s.raftStore = store
stable = store
// Wrap the store in a LogCache to improve performance
cacheStore, err := raft.NewLogCache(raftLogCacheSize, store)
if err != nil {
store.Close()
return err
}
log = cacheStore
// Create the snapshot store
snapshots, err := raft.NewFileSnapshotStore(path, snapshotsRetained, s.config.LogOutput)
if err != nil {
if s.raftStore != nil {
s.raftStore.Close()
}
return err
}
snap = snapshots
// Setup the peer store
s.raftPeers = raft.NewJSONPeers(path, trans)
peers = s.raftPeers
}
// Ensure local host is always included if we are in bootstrap mode
if s.config.RaftConfig.EnableSingleNode {
p, err := peers.Peers()
if err != nil {
if s.raftStore != nil {
s.raftStore.Close()
}
return err
}
if !raft.PeerContained(p, trans.LocalAddr()) {
peers.SetPeers(raft.AddUniquePeer(p, trans.LocalAddr()))
}
}
// Make sure we set the LogOutput
s.config.RaftConfig.LogOutput = s.config.LogOutput
// Setup the leader channel
leaderCh := make(chan bool, 1)
s.config.RaftConfig.NotifyCh = leaderCh
s.leaderCh = leaderCh
// Setup the Raft store
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable,
snap, peers, trans)
if err != nil {
if s.raftStore != nil {
s.raftStore.Close()
}
//.........这里部分代码省略.........
开发者ID:bastiaanb,项目名称:nomad,代码行数:101,代码来源:server.go
示例4: setupRaft
// setupRaft is used to setup and initialize Raft
func (s *Server) setupRaft() error {
// If we are in bootstrap mode, enable a single node cluster
if s.config.Bootstrap {
s.config.RaftConfig.EnableSingleNode = true
}
// Create the base path
path := filepath.Join(s.config.DataDir, raftState)
if err := ensurePath(path, true); err != nil {
return err
}
// Create the FSM
var err error
s.fsm, err = NewFSM(s.config.LogOutput)
if err != nil {
return err
}
// Create the MDB store for logs and stable storage
store, err := raftmdb.NewMDBStoreWithSize(path, raftDBSize)
if err != nil {
return err
}
s.raftStore = store
// Create the snapshot store
snapshots, err := raft.NewFileSnapshotStore(path, snapshotsRetained, s.config.LogOutput)
if err != nil {
store.Close()
return err
}
// Create a transport layer
trans := raft.NewNetworkTransport(s.raftLayer, 3, 10*time.Second, s.config.LogOutput)
s.raftTransport = trans
// Setup the peer store
s.raftPeers = raft.NewJSONPeers(path, trans)
// Ensure local host is always included if we are in bootstrap mode
if s.config.Bootstrap {
peers, err := s.raftPeers.Peers()
if err != nil {
store.Close()
return err
}
if !raft.PeerContained(peers, trans.LocalAddr()) {
s.raftPeers.SetPeers(raft.AddUniquePeer(peers, trans.LocalAddr()))
}
}
// Make sure we set the LogOutput
s.config.RaftConfig.LogOutput = s.config.LogOutput
// Setup the Raft store
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, store, store,
snapshots, s.raftPeers, trans)
if err != nil {
store.Close()
trans.Close()
return err
}
// Start monitoring leadership
go s.monitorLeadership()
return nil
}
开发者ID:rayleyva,项目名称:consul,代码行数:69,代码来源:server.go
示例5: open
func (r *localRaft) open() error {
s := r.store
// Setup raft configuration.
config := raft.DefaultConfig()
config.LogOutput = ioutil.Discard
if s.clusterTracingEnabled {
config.Logger = s.Logger
}
config.HeartbeatTimeout = s.HeartbeatTimeout
config.ElectionTimeout = s.ElectionTimeout
config.LeaderLeaseTimeout = s.LeaderLeaseTimeout
config.CommitTimeout = s.CommitTimeout
// If no peers are set in the config or there is one and we are it, then start as a single server.
if len(s.peers) <= 1 {
config.EnableSingleNode = true
// Ensure we can always become the leader
config.DisableBootstrapAfterElect = false
// Don't shutdown raft automatically if we renamed our hostname back to a previous name
config.ShutdownOnRemove = false
}
// Build raft layer to multiplex listener.
r.raftLayer = newRaftLayer(s.RaftListener, s.RemoteAddr)
// Create a transport layer
r.transport = raft.NewNetworkTransport(r.raftLayer, 3, 10*time.Second, config.LogOutput)
// Create peer storage.
r.peerStore = raft.NewJSONPeers(s.path, r.transport)
peers, err := r.peerStore.Peers()
if err != nil {
return err
}
// Make sure our address is in the raft peers or we won't be able to boot into the cluster
if len(peers) > 0 && !raft.PeerContained(peers, s.RemoteAddr.String()) {
s.Logger.Printf("%v is not in the list of raft peers. Please update %v/peers.json on all raft nodes to have the same contents.", s.RemoteAddr.String(), s.Path())
return fmt.Errorf("peers out of sync: %v not in %v", s.RemoteAddr.String(), peers)
}
// Create the log store and stable store.
store, err := raftboltdb.NewBoltStore(filepath.Join(s.path, "raft.db"))
if err != nil {
return fmt.Errorf("new bolt store: %s", err)
}
r.raftStore = store
// Create the snapshot store.
snapshots, err := raft.NewFileSnapshotStore(s.path, raftSnapshotsRetained, os.Stderr)
if err != nil {
return fmt.Errorf("file snapshot store: %s", err)
}
// Create raft log.
ra, err := raft.NewRaft(config, (*storeFSM)(s), store, store, snapshots, r.peerStore, r.transport)
if err != nil {
return fmt.Errorf("new raft: %s", err)
}
r.raft = ra
return nil
}
开发者ID:GoIncremental,项目名称:influxdb,代码行数:65,代码来源:state.go
示例6: setupRaft
// setupRaft is used to setup and initialize Raft
func (s *Server) setupRaft() error {
// If we are in bootstrap mode, enable a single node cluster
if s.config.Bootstrap || s.config.DevMode {
s.config.RaftConfig.EnableSingleNode = true
}
// Create the FSM
var err error
s.fsm, err = NewFSM(s.tombstoneGC, s.config.LogOutput)
if err != nil {
return err
}
// Create a transport layer
trans := raft.NewNetworkTransport(s.raftLayer, 3, 10*time.Second, s.config.LogOutput)
s.raftTransport = trans
var log raft.LogStore
var stable raft.StableStore
var snap raft.SnapshotStore
if s.config.DevMode {
store := raft.NewInmemStore()
s.raftInmem = store
stable = store
log = store
snap = raft.NewDiscardSnapshotStore()
s.raftPeers = &raft.StaticPeers{}
} else {
// Create the base raft path
path := filepath.Join(s.config.DataDir, raftState)
if err := ensurePath(path, true); err != nil {
return err
}
// Create the backend raft store for logs and stable storage
store, err := raftboltdb.NewBoltStore(filepath.Join(path, "raft.db"))
if err != nil {
return err
}
s.raftStore = store
stable = store
// Wrap the store in a LogCache to improve performance
cacheStore, err := raft.NewLogCache(raftLogCacheSize, store)
if err != nil {
store.Close()
return err
}
log = cacheStore
// Create the snapshot store
snapshots, err := raft.NewFileSnapshotStore(path, snapshotsRetained, s.config.LogOutput)
if err != nil {
store.Close()
return err
}
snap = snapshots
// Setup the peer store
s.raftPeers = raft.NewJSONPeers(path, trans)
}
// Ensure local host is always included if we are in bootstrap mode
if s.config.Bootstrap {
peerAddrs, err := s.raftPeers.Peers()
if err != nil {
if s.raftStore != nil {
s.raftStore.Close()
}
return err
}
if !raft.PeerContained(peerAddrs, trans.LocalAddr()) {
s.raftPeers.SetPeers(raft.AddUniquePeer(peerAddrs, trans.LocalAddr()))
}
}
// Make sure we set the LogOutput
s.config.RaftConfig.LogOutput = s.config.LogOutput
// Setup the Raft store
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable,
snap, s.raftPeers, trans)
if err != nil {
if s.raftStore != nil {
s.raftStore.Close()
}
trans.Close()
return err
}
// Start monitoring leadership
go s.monitorLeadership()
return nil
}
开发者ID:nathan7,项目名称:consul,代码行数:96,代码来源:server.go
示例7: open
func (r *localRaft) open() error {
r.closing = make(chan struct{})
s := r.store
// Setup raft configuration.
config := raft.DefaultConfig()
config.LogOutput = ioutil.Discard
if s.clusterTracingEnabled {
config.Logger = s.Logger
}
config.HeartbeatTimeout = s.HeartbeatTimeout
config.ElectionTimeout = s.ElectionTimeout
config.LeaderLeaseTimeout = s.LeaderLeaseTimeout
config.CommitTimeout = s.CommitTimeout
// Since we actually never call `removePeer` this is safe.
// If in the future we decide to call remove peer we have to re-evaluate how to handle this
config.ShutdownOnRemove = false
// If no peers are set in the config or there is one and we are it, then start as a single server.
if len(s.peers) <= 1 {
config.EnableSingleNode = true
// Ensure we can always become the leader
config.DisableBootstrapAfterElect = false
}
// Build raft layer to multiplex listener.
r.raftLayer = newRaftLayer(s.RaftListener, s.RemoteAddr)
// Create a transport layer
r.transport = raft.NewNetworkTransport(r.raftLayer, 3, 10*time.Second, config.LogOutput)
// Create peer storage.
r.peerStore = raft.NewJSONPeers(s.path, r.transport)
peers, err := r.peerStore.Peers()
if err != nil {
return err
}
// For single-node clusters, we can update the raft peers before we start the cluster if the hostname
// has changed.
if config.EnableSingleNode {
if err := r.peerStore.SetPeers([]string{s.RemoteAddr.String()}); err != nil {
return err
}
peers = []string{s.RemoteAddr.String()}
}
// If we have multiple nodes in the cluster, make sure our address is in the raft peers or
// we won't be able to boot into the cluster because the other peers will reject our new hostname. This
// is difficult to resolve automatically because we need to have all the raft peers agree on the current members
// of the cluster before we can change them.
if len(peers) > 0 && !raft.PeerContained(peers, s.RemoteAddr.String()) {
s.Logger.Printf("%s is not in the list of raft peers. Please update %v/peers.json on all raft nodes to have the same contents.", s.RemoteAddr.String(), s.Path())
return fmt.Errorf("peers out of sync: %v not in %v", s.RemoteAddr.String(), peers)
}
// Create the log store and stable store.
store, err := raftboltdb.NewBoltStore(filepath.Join(s.path, "raft.db"))
if err != nil {
return fmt.Errorf("new bolt store: %s", err)
}
r.raftStore = store
// Create the snapshot store.
snapshots, err := raft.NewFileSnapshotStore(s.path, raftSnapshotsRetained, os.Stderr)
if err != nil {
return fmt.Errorf("file snapshot store: %s", err)
}
// Create raft log.
ra, err := raft.NewRaft(config, (*storeFSM)(s), store, store, snapshots, r.peerStore, r.transport)
if err != nil {
return fmt.Errorf("new raft: %s", err)
}
r.raft = ra
r.wg.Add(1)
go r.logLeaderChanges()
return nil
}
开发者ID:rzagabe,项目名称:telegraf,代码行数:83,代码来源:state.go
示例8: open
func (r *raftState) open(s *store, ln net.Listener, initializePeers []string) error {
r.ln = ln
r.closing = make(chan struct{})
// Setup raft configuration.
config := raft.DefaultConfig()
config.LogOutput = ioutil.Discard
if r.config.ClusterTracing {
config.Logger = r.logger
}
config.HeartbeatTimeout = time.Duration(r.config.HeartbeatTimeout)
config.ElectionTimeout = time.Duration(r.config.ElectionTimeout)
config.LeaderLeaseTimeout = time.Duration(r.config.LeaderLeaseTimeout)
config.CommitTimeout = time.Duration(r.config.CommitTimeout)
// Since we actually never call `removePeer` this is safe.
// If in the future we decide to call remove peer we have to re-evaluate how to handle this
config.ShutdownOnRemove = false
// Build raft layer to multiplex listener.
r.raftLayer = newRaftLayer(r.addr, r.ln)
// Create a transport layer
r.transport = raft.NewNetworkTransport(r.raftLayer, 3, 10*time.Second, config.LogOutput)
// Create peer storage.
r.peerStore = &peerStore{}
// This server is joining the raft cluster for the first time if initializePeers are passed in
if len(initializePeers) > 0 {
if err := r.peerStore.SetPeers(initializePeers); err != nil {
return err
}
}
peers, err := r.peerStore.Peers()
if err != nil {
return err
}
// If no peers are set in the config or there is one and we are it, then start as a single server.
if len(initializePeers) <= 1 {
config.EnableSingleNode = true
// Ensure we can always become the leader
config.DisableBootstrapAfterElect = false
// Make sure our peer address is here. This happens with either a single node cluster
// or a node joining the cluster, as no one else has that information yet.
if !raft.PeerContained(peers, r.addr) {
if err := r.peerStore.SetPeers([]string{r.addr}); err != nil {
return err
}
}
peers = []string{r.addr}
}
// Create the log store and stable store.
store, err := raftboltdb.NewBoltStore(filepath.Join(r.path, "raft.db"))
if err != nil {
return fmt.Errorf("new bolt store: %s", err)
}
r.raftStore = store
// Create the snapshot store.
snapshots, err := raft.NewFileSnapshotStore(r.path, raftSnapshotsRetained, os.Stderr)
if err != nil {
return fmt.Errorf("file snapshot store: %s", err)
}
// Create raft log.
ra, err := raft.NewRaft(config, (*storeFSM)(s), store, store, snapshots, r.peerStore, r.transport)
if err != nil {
return fmt.Errorf("new raft: %s", err)
}
r.raft = ra
r.wg.Add(1)
go r.logLeaderChanges()
return nil
}
开发者ID:hawson,项目名称:influxdb,代码行数:83,代码来源:raft_state.go
示例9: NewRaft
// NewRaft creates a new Raft instance. raft data is stored under the raft dir in prefix.
func NewRaft(c RaftConfig, prefix string, logDir string) (r *Raft, err error) {
r = new(Raft)
config := raft.DefaultConfig()
config.EnableSingleNode = c.Single
var logOutput *os.File
if logDir != "\n" {
logFile := path.Join(logDir, "raft.log")
logOutput, err = os.OpenFile(logFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
Fatal("Could not open raft log file: ", err)
}
config.LogOutput = logOutput
}
raftDir := path.Join(prefix, "raft")
err = os.MkdirAll(raftDir, 0755)
if err != nil {
Fatal("Could not create raft storage dir: ", err)
}
fss, err := raft.NewFileSnapshotStore(raftDir, 1, nil)
if err != nil {
Error("Could not initialize raft snapshot store: ", err)
return
}
// this should be our externally visible address. If not provided in the
// config as 'advertise', we use the address of the listen config.
if c.Advertise == nil {
c.Advertise = &c.Listen
}
a, err := net.ResolveTCPAddr("tcp", *c.Advertise)
if err != nil {
Error("Could not lookup raft advertise address: ", err)
return
}
r.transport, err = raft.NewTCPTransport(c.Listen, a, 3, 10*time.Second, nil)
if err != nil {
Error("Could not create raft transport: ", err)
return
}
peerStore := raft.NewJSONPeers(raftDir, r.transport)
if !c.Single {
var peers []net.Addr
peers, err = peerStore.Peers()
if err != nil {
return
}
for _, peerStr := range c.Peers {
peer, err := net.ResolveTCPAddr("tcp", peerStr)
if err != nil {
Fatal("Bad peer:", err)
}
if !raft.PeerContained(peers, peer) {
peerStore.SetPeers(raft.AddUniquePeer(peers, peer))
}
}
} else {
Warn("Running in single node permitted mode. Only use this for testing!")
}
r.mdb, err = raftmdb.NewMDBStore(raftDir)
if err != nil {
Error("Could not create raft store:", err)
return
}
storage, err := NewStorage()
if err != nil {
Error("Could not create storage:", err)
return
}
r.fsm = &FSM{storage}
r.raft, err = raft.NewRaft(config, r.fsm, r.mdb, r.mdb, fss, peerStore, r.transport)
if err != nil {
Error("Could not initialize raft: ", err)
return
}
return
}
开发者ID:hfeeki,项目名称:delayd,代码行数:92,代码来源:raft.go
示例10: setupRaft
// setupRaft is used to setup and initialize Raft
func (s *Server) setupRaft() error {
// If we are in bootstrap mode, enable a single node cluster
if s.config.Bootstrap {
s.config.RaftConfig.EnableSingleNode = true
}
// Create the base state path
statePath := filepath.Join(s.config.DataDir, tmpStatePath)
if err := os.RemoveAll(statePath); err != nil {
return err
}
if err := ensurePath(statePath, true); err != nil {
return err
}
// Create the FSM
var err error
s.fsm, err = NewFSM(s.tombstoneGC, statePath, s.config.LogOutput)
if err != nil {
return err
}
// Set the maximum raft size based on 32/64bit. Since we are
// doing an mmap underneath, we need to limit our use of virtual
// address space on 32bit, but don't have to care on 64bit.
dbSize := raftDBSize32bit
if runtime.GOARCH == "amd64" {
dbSize = raftDBSize64bit
}
// Create the base raft path
path := filepath.Join(s.config.DataDir, raftState)
if err := ensurePath(path, true); err != nil {
return err
}
// Create the MDB store for logs and stable storage
store, err := raftmdb.NewMDBStoreWithSize(path, dbSize)
if err != nil {
return err
}
s.raftStore = store
// Wrap the store in a LogCache to improve performance
cacheStore, err := raft.NewLogCache(raftLogCacheSize, store)
if err != nil {
store.Close()
return err
}
// Create the snapshot store
snapshots, err := raft.NewFileSnapshotStore(path, snapshotsRetained, s.config.LogOutput)
if err != nil {
store.Close()
return err
}
// Create a transport layer
trans := raft.NewNetworkTransport(s.raftLayer, 3, 10*time.Second, s.config.LogOutput)
s.raftTransport = trans
// Setup the peer store
s.raftPeers = raft.NewJSONPeers(path, trans)
// Ensure local host is always included if we are in bootstrap mode
if s.config.Bootstrap {
peers, err := s.raftPeers.Peers()
if err != nil {
store.Close()
return err
}
if !raft.PeerContained(peers, trans.LocalAddr()) {
s.raftPeers.SetPeers(raft.AddUniquePeer(peers, trans.LocalAddr()))
}
}
// Make sure we set the LogOutput
s.config.RaftConfig.LogOutput = s.config.LogOutput
// Setup the Raft store
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, cacheStore, store,
snapshots, s.raftPeers, trans)
if err != nil {
store.Close()
trans.Close()
return err
}
// Start monitoring leadership
go s.monitorLeadership()
return nil
}
开发者ID:jefferai,项目名称:consul,代码行数:93,代码来源:server.go
示例11: setupRaft
// setupRaft is used to setup and initialize Raft
func (c *cerebrum) setupRaft() error {
// If we are in bootstrap mode, enable a single node cluster
if c.config.Bootstrap {
c.config.RaftConfig.EnableSingleNode = true
}
// Create the base state path
statePath := filepath.Join(c.config.DataPath, tmpStatePath)
if err := os.RemoveAll(statePath); err != nil {
return err
}
if err := os.MkdirAll(filepath.Dir(statePath), 0755); err != nil {
return err
}
// Create the base raft path
path := filepath.Join(c.config.DataPath, RaftStateDir)
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return err
}
// Create the backend raft store for logs and stable storage
store, err := raftboltdb.NewBoltStore(filepath.Join(path, "raft.db"))
if err != nil {
return err
}
c.raftStore = store
// Wrap the store in a LogCache to improve performance
cacheStore, err := raft.NewLogCache(c.config.LogCacheSize, store)
if err != nil {
store.Close()
return err
}
// Create the snapshot store
snapshots, err := raft.NewFileSnapshotStore(path, c.config.SnapshotsRetained, c.config.LogOutput)
if err != nil {
store.Close()
return err
}
// Try to bind
addr, err := net.ResolveTCPAddr("tcp", c.config.RaftBindAddr)
if err != nil {
return err
}
// Start TCP listener
listener, err := net.ListenTCP("tcp", addr)
if err != nil {
return err
}
// Create connection layer and transport
layer := NewRaftLayer(c.dialer, listener.Addr(), c.config.TLSConfig)
c.raftTransport = raft.NewNetworkTransport(layer, 3, 10*time.Second, c.config.LogOutput)
// Create TLS connection dispatcher
dispatcher := yamuxer.NewDispatcher(log.NewLogger(c.config.LogOutput, "dispatcher"), nil)
dispatcher.Register(connRaft, layer)
dispatcher.Register(connForward, &ForwardingHandler{c.applier, log.NewLogger(c.config.LogOutput, "forwarder")})
// Create TLS connection muxer
c.muxer = yamuxer.New(c.context, &yamuxer.Config{
Listener: listener,
TLSConfig: c.config.TLSConfig,
Deadline: c.config.ConnectionDeadline,
LogOutput: c.config.LogOutput,
Dispatcher: dispatcher,
})
// Setup the peer store
c.raftPeers = raft.NewJSONPeers(path, c.raftTransport)
// Ensure local host is always included if we are in bootstrap mode
if c.config.Bootstrap {
peers, err := c.raftPeers.Peers()
if err != nil {
store.Close()
return err
}
if !raft.PeerContained(peers, c.raftTransport.LocalAddr()) {
c.raftPeers.SetPeers(raft.AddUniquePeer(peers, c.raftTransport.LocalAddr()))
}
}
// Make sure we set the LogOutput
c.config.RaftConfig.LogOutput = c.config.LogOutput
// Setup the Raft store
c.raft, err = raft.NewRaft(c.config.RaftConfig, c.fsm, cacheStore, store,
snapshots, c.raftPeers, c.raftTransport)
if err != nil {
store.Close()
c.raftTransport.Close()
return err
}
//.........这里部分代码省略.........
开发者ID:blacklabeldata,项目名称:cerebrum,代码行数:101,代码来源:server.go
注:本文中的github.com/hashicorp/raft.PeerContained函数示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论