本文整理汇总了Golang中github.com/couchbase/indexing/secondary/common.CrashOnError函数的典型用法代码示例。如果您正苦于以下问题:Golang CrashOnError函数的具体用法?Golang CrashOnError怎么用?Golang CrashOnError使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了CrashOnError函数的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。
示例1: ValidateBucket
func ValidateBucket(cluster, bucket string, uuids []string) bool {
var cinfo *common.ClusterInfoCache
url, err := common.ClusterAuthUrl(cluster)
if err == nil {
cinfo, err = common.NewClusterInfoCache(url, DEFAULT_POOL)
}
if err != nil {
logging.Fatalf("Indexer::Fail to init ClusterInfoCache : %v", err)
common.CrashOnError(err)
}
cinfo.Lock()
defer cinfo.Unlock()
if err := cinfo.Fetch(); err != nil {
logging.Errorf("Indexer::Fail to init ClusterInfoCache : %v", err)
common.CrashOnError(err)
}
if nids, err := cinfo.GetNodesByBucket(bucket); err == nil && len(nids) != 0 {
// verify UUID
currentUUID := cinfo.GetBucketUUID(bucket)
for _, uuid := range uuids {
if uuid != currentUUID {
return false
}
}
return true
} else {
logging.Fatalf("Indexer::Error Fetching Bucket Info: %v Nids: %v", err, nids)
return false
}
}
开发者ID:prataprc,项目名称:indexing,代码行数:35,代码来源:util.go
示例2: deletePrimaryIndex
func (fdb *fdbSlice) deletePrimaryIndex(docid []byte, workerId int) {
//logging.Tracef("ForestDBSlice::delete \n\tSliceId %v IndexInstId %v. Delete Key - %s",
// fdb.id, fdb.idxInstId, docid)
if docid == nil {
common.CrashOnError(errors.New("Nil Primary Key"))
return
}
//docid -> key format
entry, err := NewPrimaryIndexEntry(docid)
common.CrashOnError(err)
//delete from main index
t0 := time.Now()
if err := fdb.main[workerId].DeleteKV(entry.Bytes()); err != nil {
fdb.checkFatalDbError(err)
logging.Errorf("ForestDBSlice::delete \n\tSliceId %v IndexInstId %v. Error deleting "+
"entry from main index for Doc %s. Error %v", fdb.id, fdb.idxInstId,
docid, err)
return
}
fdb.idxStats.Timings.stKVDelete.Put(time.Now().Sub(t0))
platform.AddInt64(&fdb.delete_bytes, int64(len(entry.Bytes())))
}
开发者ID:prataprc,项目名称:indexing,代码行数:27,代码来源:forestdb_slice_writer.go
示例3: handleGetGlobalTopology
func (c *clustMgrAgent) handleGetGlobalTopology(cmd Message) {
logging.Debugf("ClustMgr:handleGetGlobalTopology %v", cmd)
//get the latest topology from manager
metaIter, err := c.mgr.NewIndexDefnIterator()
if err != nil {
common.CrashOnError(err)
}
defer metaIter.Close()
indexInstMap := make(common.IndexInstMap)
for _, defn, err := metaIter.Next(); err == nil; _, defn, err = metaIter.Next() {
var idxDefn common.IndexDefn
idxDefn = *defn
t, e := c.mgr.GetTopologyByBucket(idxDefn.Bucket)
if e != nil {
common.CrashOnError(e)
}
inst := t.GetIndexInstByDefn(idxDefn.DefnId)
if inst == nil {
logging.Warnf("ClustMgr:handleGetGlobalTopology Index Instance Not "+
"Found For Index Definition %v. Ignored.", idxDefn)
continue
}
//for indexer, Ready state doesn't matter. Till index build,
//the index stays in Created state.
var state common.IndexState
instState := common.IndexState(inst.State)
if instState == common.INDEX_STATE_READY {
state = common.INDEX_STATE_CREATED
} else {
state = instState
}
idxInst := common.IndexInst{InstId: common.IndexInstId(inst.InstId),
Defn: idxDefn,
State: state,
Stream: common.StreamId(inst.StreamId),
}
indexInstMap[idxInst.InstId] = idxInst
}
c.supvCmdch <- &MsgClustMgrTopology{indexInstMap: indexInstMap}
}
开发者ID:prataprc,项目名称:indexing,代码行数:53,代码来源:cluster_manager_agent.go
示例4: OnIndexBuild
func (meta *metaNotifier) OnIndexBuild(indexDefnList []common.IndexDefnId, buckets []string) map[common.IndexInstId]error {
logging.Infof("clustMgrAgent::OnIndexBuild Notification "+
"Received for Build Index %v", indexDefnList)
respCh := make(MsgChannel)
var indexInstList []common.IndexInstId
for _, defnId := range indexDefnList {
indexInstList = append(indexInstList, common.IndexInstId(defnId))
}
meta.adminCh <- &MsgBuildIndex{indexInstList: indexInstList,
respCh: respCh,
bucketList: buckets}
//wait for response
if res, ok := <-respCh; ok {
switch res.GetMsgType() {
case CLUST_MGR_BUILD_INDEX_DDL_RESPONSE:
errMap := res.(*MsgBuildIndexResponse).GetErrorMap()
logging.Infof("clustMgrAgent::OnIndexBuild returns "+
"for Build Index %v", indexDefnList)
return errMap
case MSG_ERROR:
logging.Errorf("clustMgrAgent::OnIndexBuild Error "+
"for Build Index %v. Error %v.", indexDefnList, res)
err := res.(*MsgError).GetError()
errMap := make(map[common.IndexInstId]error)
for _, instId := range indexDefnList {
errMap[common.IndexInstId(instId)] = errors.New(err.String())
}
return errMap
default:
logging.Fatalf("clustMgrAgent::OnIndexBuild Unknown Response "+
"Received for Build Index %v. Response %v", indexDefnList, res)
common.CrashOnError(errors.New("Unknown Response"))
}
} else {
logging.Fatalf("clustMgrAgent::OnIndexBuild Unexpected Channel Close "+
"for Create Index %v", indexDefnList)
common.CrashOnError(errors.New("Unknown Response"))
}
return nil
}
开发者ID:prataprc,项目名称:indexing,代码行数:53,代码来源:cluster_manager_agent.go
示例5: OnIndexCreate
func (meta *metaNotifier) OnIndexCreate(indexDefn *common.IndexDefn) error {
logging.Infof("clustMgrAgent::OnIndexCreate Notification "+
"Received for Create Index %v", indexDefn)
pc := meta.makeDefaultPartitionContainer()
idxInst := common.IndexInst{InstId: common.IndexInstId(indexDefn.DefnId),
Defn: *indexDefn,
State: common.INDEX_STATE_CREATED,
Pc: pc,
}
respCh := make(MsgChannel)
meta.adminCh <- &MsgCreateIndex{mType: CLUST_MGR_CREATE_INDEX_DDL,
indexInst: idxInst,
respCh: respCh}
//wait for response
if res, ok := <-respCh; ok {
switch res.GetMsgType() {
case MSG_SUCCESS:
logging.Infof("clustMgrAgent::OnIndexCreate Success "+
"for Create Index %v", indexDefn)
return nil
case MSG_ERROR:
logging.Errorf("clustMgrAgent::OnIndexCreate Error "+
"for Create Index %v. Error %v.", indexDefn, res)
err := res.(*MsgError).GetError()
return err.cause
default:
logging.Fatalf("clustMgrAgent::OnIndexCreate Unknown Response "+
"Received for Create Index %v. Response %v", indexDefn, res)
common.CrashOnError(errors.New("Unknown Response"))
}
} else {
logging.Fatalf("clustMgrAgent::OnIndexCreate Unexpected Channel Close "+
"for Create Index %v", indexDefn)
common.CrashOnError(errors.New("Unknown Response"))
}
return nil
}
开发者ID:prataprc,项目名称:indexing,代码行数:51,代码来源:cluster_manager_agent.go
示例6: siSplitEntry
func siSplitEntry(entry []byte, tmp []byte) ([]byte, []byte) {
e := secondaryIndexEntry(entry)
sk, err := e.ReadSecKey(tmp)
c.CrashOnError(err)
docid, err := e.ReadDocId(sk)
return sk, docid[len(sk):]
}
开发者ID:jchris,项目名称:indexing,代码行数:7,代码来源:scan_pipeline.go
示例7: handleUpdateTopologyForIndex
func (c *clustMgrAgent) handleUpdateTopologyForIndex(cmd Message) {
logging.Debugf("ClustMgr:handleUpdateTopologyForIndex %v", cmd)
indexList := cmd.(*MsgClustMgrUpdate).GetIndexList()
updatedFields := cmd.(*MsgClustMgrUpdate).GetUpdatedFields()
updatedState := common.INDEX_STATE_NIL
updatedStream := common.NIL_STREAM
updatedError := ""
for _, index := range indexList {
if updatedFields.state {
updatedState = index.State
}
if updatedFields.stream {
updatedStream = index.Stream
}
if updatedFields.err {
updatedError = index.Error
}
updatedBuildTs := index.BuildTs
err := c.mgr.UpdateIndexInstance(index.Defn.Bucket, index.Defn.DefnId,
updatedState, updatedStream, updatedError, updatedBuildTs)
common.CrashOnError(err)
}
c.supvCmdch <- &MsgSuccess{}
}
开发者ID:prataprc,项目名称:indexing,代码行数:32,代码来源:cluster_manager_agent.go
示例8: addPartnInfoToProtoInst
func addPartnInfoToProtoInst(cfg c.Config, cinfo *c.ClusterInfoCache,
indexInst c.IndexInst, streamId c.StreamId, protoInst *protobuf.IndexInst) {
switch partn := indexInst.Pc.(type) {
case *c.KeyPartitionContainer:
//Right now the fill the SinglePartition as that is the only
//partition structure supported
partnDefn := partn.GetAllPartitions()
//TODO move this to indexer init. These addresses cannot change.
//Better to get these once and store.
cinfo.Lock()
defer cinfo.Unlock()
err := cinfo.Fetch()
c.CrashOnError(err)
nid := cinfo.GetCurrentNode()
streamMaintAddr, err := cinfo.GetServiceAddress(nid, "indexStreamMaint")
c.CrashOnError(err)
streamInitAddr, err := cinfo.GetServiceAddress(nid, "indexStreamInit")
c.CrashOnError(err)
streamCatchupAddr, err := cinfo.GetServiceAddress(nid, "indexStreamCatchup")
c.CrashOnError(err)
var endpoints []string
for _, p := range partnDefn {
for _, e := range p.Endpoints() {
//Set the right endpoint based on streamId
switch streamId {
case c.MAINT_STREAM:
e = c.Endpoint(streamMaintAddr)
case c.CATCHUP_STREAM:
e = c.Endpoint(streamCatchupAddr)
case c.INIT_STREAM:
e = c.Endpoint(streamInitAddr)
}
endpoints = append(endpoints, string(e))
}
}
protoInst.SinglePartn = &protobuf.SinglePartition{
Endpoints: endpoints,
}
}
}
开发者ID:jchris,项目名称:indexing,代码行数:46,代码来源:kv_sender.go
示例9: OnIndexDelete
func (meta *metaNotifier) OnIndexDelete(defnId common.IndexDefnId, bucket string) error {
logging.Infof("clustMgrAgent::OnIndexDelete Notification "+
"Received for Drop IndexId %v", defnId)
respCh := make(MsgChannel)
//Treat DefnId as InstId for now
meta.adminCh <- &MsgDropIndex{mType: CLUST_MGR_DROP_INDEX_DDL,
indexInstId: common.IndexInstId(defnId),
respCh: respCh,
bucket: bucket}
//wait for response
if res, ok := <-respCh; ok {
switch res.GetMsgType() {
case MSG_SUCCESS:
logging.Infof("clustMgrAgent::OnIndexDelete Success "+
"for Drop IndexId %v", defnId)
return nil
case MSG_ERROR:
logging.Errorf("clustMgrAgent::OnIndexDelete Error "+
"for Drop IndexId %v. Error %v", defnId, res)
err := res.(*MsgError).GetError()
return err.cause
default:
logging.Fatalf("clustMgrAgent::OnIndexDelete Unknown Response "+
"Received for Drop IndexId %v. Response %v", defnId, res)
common.CrashOnError(errors.New("Unknown Response"))
}
} else {
logging.Fatalf("clustMgrAgent::OnIndexDelete Unexpected Channel Close "+
"for Drop IndexId %v", defnId)
common.CrashOnError(errors.New("Unknown Response"))
}
return nil
}
开发者ID:prataprc,项目名称:indexing,代码行数:45,代码来源:cluster_manager_agent.go
示例10: handleIndexerReady
func (c *clustMgrAgent) handleIndexerReady(cmd Message) {
logging.Debugf("ClustMgr:handleIndexerReady %v", cmd)
err := c.mgr.NotifyIndexerReady()
common.CrashOnError(err)
c.supvCmdch <- &MsgSuccess{}
}
开发者ID:prataprc,项目名称:indexing,代码行数:9,代码来源:cluster_manager_agent.go
示例11: newIndexEntry
func (s *fdbSnapshot) newIndexEntry(b []byte) IndexEntry {
var entry IndexEntry
var err error
if s.slice.isPrimary {
entry, err = BytesToPrimaryIndexEntry(b)
} else {
entry, err = BytesToSecondaryIndexEntry(b)
}
common.CrashOnError(err)
return entry
}
开发者ID:jchris,项目名称:indexing,代码行数:12,代码来源:forestdb_snapshot_reader.go
示例12: handleDeleteBucket
func (c *clustMgrAgent) handleDeleteBucket(cmd Message) {
logging.Debugf("ClustMgr:handleDeleteBucket %v", cmd)
bucket := cmd.(*MsgClustMgrUpdate).GetBucket()
streamId := cmd.(*MsgClustMgrUpdate).GetStreamId()
err := c.mgr.DeleteIndexForBucket(bucket, streamId)
common.CrashOnError(err)
c.supvCmdch <- &MsgSuccess{}
}
开发者ID:prataprc,项目名称:indexing,代码行数:12,代码来源:cluster_manager_agent.go
示例13: handleSupervisorCommands
//handleSupervisorCommands handles the messages from Supervisor
//Each operation acquires the mutex to make the itself atomic.
func (m *mutationMgr) handleSupervisorCommands(cmd Message) {
switch cmd.GetMsgType() {
case OPEN_STREAM:
m.handleOpenStream(cmd)
case ADD_INDEX_LIST_TO_STREAM:
m.handleAddIndexListToStream(cmd)
case REMOVE_INDEX_LIST_FROM_STREAM:
m.handleRemoveIndexListFromStream(cmd)
case REMOVE_BUCKET_FROM_STREAM:
m.handleRemoveBucketFromStream(cmd)
case CLOSE_STREAM:
m.handleCloseStream(cmd)
case CLEANUP_STREAM:
m.handleCleanupStream(cmd)
case MUT_MGR_PERSIST_MUTATION_QUEUE:
m.handlePersistMutationQueue(cmd)
case MUT_MGR_DRAIN_MUTATION_QUEUE:
m.handleDrainMutationQueue(cmd)
case MUT_MGR_GET_MUTATION_QUEUE_HWT:
m.handleGetMutationQueueHWT(cmd)
case MUT_MGR_GET_MUTATION_QUEUE_LWT:
m.handleGetMutationQueueLWT(cmd)
case UPDATE_INDEX_INSTANCE_MAP:
m.handleUpdateIndexInstMap(cmd)
case UPDATE_INDEX_PARTITION_MAP:
m.handleUpdateIndexPartnMap(cmd)
case MUT_MGR_ABORT_PERSIST:
m.handleAbortPersist(cmd)
case CONFIG_SETTINGS_UPDATE:
m.handleConfigUpdate(cmd)
default:
logging.Fatalf("MutationMgr::handleSupervisorCommands Received Unknown Command %v", cmd)
common.CrashOnError(errors.New("Unknown Command On Supervisor Channel"))
}
}
开发者ID:jchris,项目名称:indexing,代码行数:53,代码来源:mutation_manager.go
示例14: main
func main() {
platform.HideConsole(true)
defer platform.HideConsole(false)
common.SeedProcess()
logging.Infof("Indexer started with command line: %v\n", os.Args)
flag.Parse()
logging.SetLogLevel(logging.Level(*logLevel))
forestdb.Log = &logging.SystemLogger
// setup cbauth
if *auth != "" {
up := strings.Split(*auth, ":")
logging.Tracef("Initializing cbauth with user %v for cluster %v\n", up[0], *cluster)
if _, err := cbauth.InternalRetryDefaultInit(*cluster, up[0], up[1]); err != nil {
logging.Fatalf("Failed to initialize cbauth: %s", err)
}
}
go platform.DumpOnSignal()
go common.ExitOnStdinClose()
config := common.SystemConfig
config.SetValue("indexer.clusterAddr", *cluster)
config.SetValue("indexer.numVbuckets", *numVbuckets)
config.SetValue("indexer.enableManager", *enableManager)
config.SetValue("indexer.adminPort", *adminPort)
config.SetValue("indexer.scanPort", *scanPort)
config.SetValue("indexer.httpPort", *httpPort)
config.SetValue("indexer.streamInitPort", *streamInitPort)
config.SetValue("indexer.streamCatchupPort", *streamCatchupPort)
config.SetValue("indexer.streamMaintPort", *streamMaintPort)
config.SetValue("indexer.storage_dir", *storageDir)
storage_dir := config["indexer.storage_dir"].String()
if err := os.MkdirAll(storage_dir, 0755); err != nil {
common.CrashOnError(err)
}
_, msg := indexer.NewIndexer(config)
if msg.GetMsgType() != indexer.MSG_SUCCESS {
logging.Warnf("Indexer Failure to Init %v", msg)
}
logging.Infof("Indexer exiting normally\n")
}
开发者ID:jchris,项目名称:indexing,代码行数:48,代码来源:main.go
示例15: adjustNonSnapAlignedVbs
//If a snapshot marker has been received but no mutation for that snapshot,
//the repairTs seqno will be outside the snapshot marker range and
//DCP will refuse to accept such seqno for restart. Such VBs need to
//use lastFlushTs or restartTs.
func (ss *StreamState) adjustNonSnapAlignedVbs(repairTs *common.TsVbuuid,
streamId common.StreamId, bucket string) {
for _, vbno := range repairTs.GetVbnos() {
seqno := repairTs.Seqnos[vbno]
snapBegin := repairTs.Snapshots[vbno][0]
snapEnd := repairTs.Snapshots[vbno][1]
if !(seqno >= snapBegin && seqno <= snapEnd) {
// First, use the last flush TS seqno if avaliable
if fts, ok := ss.streamBucketLastFlushedTsMap[streamId][bucket]; ok && fts != nil {
repairTs.Snapshots[vbno][0] = fts.Snapshots[vbno][0]
repairTs.Snapshots[vbno][1] = fts.Snapshots[vbno][1]
repairTs.Seqnos[vbno] = fts.Seqnos[vbno]
snapBegin = repairTs.Snapshots[vbno][0]
snapEnd = repairTs.Snapshots[vbno][1]
}
// If last flush TS is still out-of-bound, use last Snap-aligned flushed TS if available
if !(repairTs.Seqnos[vbno] >= snapBegin && repairTs.Seqnos[vbno] <= snapEnd) {
if fts, ok := ss.streamBucketLastSnapAlignFlushedTsMap[streamId][bucket]; ok && fts != nil {
repairTs.Snapshots[vbno][0] = fts.Snapshots[vbno][0]
repairTs.Snapshots[vbno][1] = fts.Snapshots[vbno][1]
repairTs.Seqnos[vbno] = fts.Seqnos[vbno]
snapBegin = repairTs.Snapshots[vbno][0]
snapEnd = repairTs.Snapshots[vbno][1]
}
}
// If last snap-aligned flushed TS is not avail, then use restartTS
if !(repairTs.Seqnos[vbno] >= snapBegin && repairTs.Seqnos[vbno] <= snapEnd) {
if rts, ok := ss.streamBucketRestartTsMap[streamId][bucket]; ok && rts != nil {
//if no flush has been done yet, use restart TS
repairTs.Snapshots[vbno][0] = rts.Snapshots[vbno][0]
repairTs.Snapshots[vbno][1] = rts.Snapshots[vbno][1]
repairTs.Seqnos[vbno] = rts.Seqnos[vbno]
snapBegin = repairTs.Snapshots[vbno][0]
snapEnd = repairTs.Snapshots[vbno][1]
}
}
//if seqno is still not with snapshot range, then it is likely a bug in state management
if !(repairTs.Seqnos[vbno] >= snapBegin && repairTs.Seqnos[vbno] <= snapEnd) {
common.CrashOnError(errors.New("No Valid Restart Seqno Found"))
}
}
}
}
开发者ID:prataprc,项目名称:indexing,代码行数:52,代码来源:stream_state.go
示例16: NewProjector
// NewProjector creates a news projector instance and
// starts a corresponding adminport.
func NewProjector(maxvbs int, config c.Config) *Projector {
p := &Projector{
topics: make(map[string]*Feed),
topicSerialize: make(map[string]*sync.Mutex),
maxvbs: maxvbs,
pooln: "default", // TODO: should this be configurable ?
}
// Setup dynamic configuration propagation
config, err := c.GetSettingsConfig(config)
c.CrashOnError(err)
pconfig := config.SectionConfig("projector.", true /*trim*/)
p.name = pconfig["name"].String()
p.clusterAddr = pconfig["clusterAddr"].String()
p.adminport = pconfig["adminport.listenAddr"].String()
ef := config["projector.routerEndpointFactory"]
config["projector.routerEndpointFactory"] = ef
p.config = config
p.ResetConfig(config)
p.logPrefix = fmt.Sprintf("PROJ[%s]", p.adminport)
callb := func(cfg c.Config) {
logging.Infof("%v settings notifier from metakv\n", p.logPrefix)
cfg.LogConfig(p.logPrefix)
p.ResetConfig(cfg)
}
c.SetupSettingsNotifier(callb, make(chan struct{}))
cluster := p.clusterAddr
if !strings.HasPrefix(p.clusterAddr, "http://") {
cluster = "http://" + cluster
}
apConfig := config.SectionConfig("projector.adminport.", true)
apConfig.SetValue("name", "PRAM")
reqch := make(chan ap.Request)
p.admind = ap.NewHTTPServer(apConfig, reqch)
watchInterval := config["projector.watchInterval"].Int()
staleTimeout := config["projector.staleTimeout"].Int()
go p.mainAdminPort(reqch)
go p.watcherDameon(watchInterval, staleTimeout)
logging.Infof("%v started ...\n", p.logPrefix)
return p
}
开发者ID:jchris,项目名称:indexing,代码行数:50,代码来源:projector.go
示例17: checkFatalDbError
//checkFatalDbError checks if the error returned from DB
//is fatal and stores it. This error will be returned
//to caller on next DB operation
func (fdb *fdbSlice) checkFatalDbError(err error) {
//panic on all DB errors and recover rather than risk
//inconsistent db state
common.CrashOnError(err)
errStr := err.Error()
switch errStr {
case "checksum error", "file corruption", "no db instance",
"alloc fail", "seek fail", "fsync fail":
fdb.fatalDbErr = err
}
}
开发者ID:prataprc,项目名称:indexing,代码行数:19,代码来源:forestdb_slice_writer.go
示例18: handleWorkerMessage
//handleWorkerMessage handles messages from workers
func (m *mutationMgr) handleWorkerMessage(cmd Message) {
switch cmd.GetMsgType() {
case STREAM_READER_STREAM_DROP_DATA,
STREAM_READER_STREAM_BEGIN,
STREAM_READER_STREAM_END,
STREAM_READER_ERROR,
STREAM_READER_CONN_ERROR,
STREAM_READER_HWT:
//send message to supervisor to take decision
logging.Tracef("MutationMgr::handleWorkerMessage Received %v from worker", cmd)
m.supvRespch <- cmd
default:
logging.Fatalf("MutationMgr::handleWorkerMessage Received unhandled "+
"message from worker %v", cmd)
common.CrashOnError(errors.New("Unknown Message On Worker Channel"))
}
}
开发者ID:jchris,项目名称:indexing,代码行数:22,代码来源:mutation_manager.go
示例19: createSnapshotWorker
func (s *storageMgr) createSnapshotWorker(streamId common.StreamId, bucket string,
tsVbuuid *common.TsVbuuid, indexSnapMap IndexSnapMap, numVbuckets int,
indexInstMap common.IndexInstMap, indexPartnMap IndexPartnMap, stats *IndexerStats) {
defer destroyIndexSnapMap(indexSnapMap)
var needsCommit bool
snapType := tsVbuuid.GetSnapType()
if snapType == common.DISK_SNAP {
needsCommit = true
}
var wg sync.WaitGroup
//for every index managed by this indexer
for idxInstId, partnMap := range indexPartnMap {
// Create snapshots for all indexes in parallel
wg.Add(1)
go func(idxInstId common.IndexInstId, partnMap PartitionInstMap) {
defer wg.Done()
idxInst := indexInstMap[idxInstId]
idxStats := stats.indexes[idxInst.InstId]
lastIndexSnap := indexSnapMap[idxInstId]
//if index belongs to the flushed bucket and stream
if idxInst.Defn.Bucket == bucket &&
idxInst.Stream == streamId &&
idxInst.State != common.INDEX_STATE_DELETED {
// List of snapshots for reading current timestamp
var isSnapCreated bool = true
partnSnaps := make(map[common.PartitionId]PartitionSnapshot)
//for all partitions managed by this indexer
for partnId, partnInst := range partnMap {
var lastPartnSnap PartitionSnapshot
if lastIndexSnap != nil {
lastPartnSnap = lastIndexSnap.Partitions()[partnId]
}
sc := partnInst.Sc
sliceSnaps := make(map[SliceId]SliceSnapshot)
//create snapshot for all the slices
for _, slice := range sc.GetAllSlices() {
var latestSnapshot Snapshot
if lastIndexSnap.Partitions() != nil {
lastSliceSnap := lastPartnSnap.Slices()[slice.Id()]
latestSnapshot = lastSliceSnap.Snapshot()
}
//if flush timestamp is greater than last
//snapshot timestamp, create a new snapshot
snapTs := NewTimestamp(numVbuckets)
if latestSnapshot != nil {
snapTsVbuuid := latestSnapshot.Timestamp()
snapTs = getSeqTsFromTsVbuuid(snapTsVbuuid)
}
ts := getSeqTsFromTsVbuuid(tsVbuuid)
//if the flush TS is greater than the last snapshot TS
//and slice has some changes. Skip only in-memory snapshot
//in case of unchanged data.
if latestSnapshot == nil || (ts.GreaterThan(snapTs) &&
(slice.IsDirty() || needsCommit)) {
newTsVbuuid := tsVbuuid.Copy()
var err error
var info SnapshotInfo
var newSnapshot Snapshot
logging.Tracef("StorageMgr::handleCreateSnapshot Creating New Snapshot "+
"Index: %v PartitionId: %v SliceId: %v Commit: %v", idxInstId, partnId, slice.Id(), needsCommit)
snapCreateStart := time.Now()
if info, err = slice.NewSnapshot(newTsVbuuid, needsCommit); err != nil {
logging.Errorf("handleCreateSnapshot::handleCreateSnapshot Error "+
"Creating new snapshot Slice Index: %v Slice: %v. Skipped. Error %v", idxInstId,
slice.Id(), err)
isSnapCreated = false
common.CrashOnError(err)
continue
}
snapCreateDur := time.Since(snapCreateStart)
idxStats := stats.indexes[idxInstId]
idxStats.numSnapshots.Add(1)
if needsCommit {
idxStats.numCommits.Add(1)
}
snapOpenStart := time.Now()
if newSnapshot, err = slice.OpenSnapshot(info); err != nil {
logging.Errorf("StorageMgr::handleCreateSnapshot Error Creating Snapshot "+
"for Index: %v Slice: %v. Skipped. Error %v", idxInstId,
slice.Id(), err)
isSnapCreated = false
common.CrashOnError(err)
continue
}
//.........这里部分代码省略.........
开发者ID:jchris,项目名称:indexing,代码行数:101,代码来源:storage_manager.go
注:本文中的github.com/couchbase/indexing/secondary/common.CrashOnError函数示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论