本文整理汇总了Golang中github.com/couchbase/indexing/secondary/protobuf/projector.TsVbuuid类的典型用法代码示例。如果您正苦于以下问题:Golang TsVbuuid类的具体用法?Golang TsVbuuid怎么用?Golang TsVbuuid使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了TsVbuuid类的16个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。
示例1: sendMutationTopicRequest
//send the actual MutationStreamRequest on adminport
func (k *kvSender) sendMutationTopicRequest(ap *projClient.Client, topic string,
reqTimestamps *protobuf.TsVbuuid,
instances []*protobuf.Instance) (*protobuf.TopicResponse, error) {
logging.Infof("KVSender::sendMutationTopicRequest Projector %v Topic %v %v \n\tInstances %v",
ap, topic, reqTimestamps.GetBucket(), instances)
logging.LazyVerbosef("KVSender::sendMutationTopicRequest RequestTS %v", reqTimestamps.Repr)
endpointType := "dataport"
if res, err := ap.MutationTopicRequest(topic, endpointType,
[]*protobuf.TsVbuuid{reqTimestamps}, instances); err != nil {
logging.Fatalf("KVSender::sendMutationTopicRequest Projector %v Topic %v %v \n\tUnexpected Error %v", ap,
topic, reqTimestamps.GetBucket(), err)
return res, err
} else {
logging.Infof("KVSender::sendMutationTopicRequest Success Projector %v Topic %v %v InstanceIds %v",
ap, topic, reqTimestamps.GetBucket(), res.GetInstanceIds())
if logging.IsEnabled(logging.Verbose) {
logging.Verbosef("KVSender::sendMutationTopicRequest ActiveTs %v \n\tRollbackTs %v",
debugPrintTs(res.GetActiveTimestamps(), reqTimestamps.GetBucket()),
debugPrintTs(res.GetRollbackTimestamps(), reqTimestamps.GetBucket()))
}
return res, nil
}
}
开发者ID:jchris,项目名称:indexing,代码行数:29,代码来源:kv_sender.go
示例2: checkVbListInTS
//check if any vb in vbList is part of the given ts
func checkVbListInTS(vbList []uint32, ts *protobuf.TsVbuuid) bool {
for _, vb := range vbList {
if ts.Contains(uint16(vb)) == true {
return true
}
}
return false
}
开发者ID:jchris,项目名称:indexing,代码行数:11,代码来源:kv_sender.go
示例3: StartVbStreams
// StartVbStreams implements Feeder{} interface.
func (bdcp *bucketDcp) StartVbStreams(
opaque uint16, reqTs *protobuf.TsVbuuid) error {
var err error
if bdcp.bucket != nil {
bdcp.bucket.Refresh()
}
vbnos := c.Vbno32to16(reqTs.GetVbnos())
vbuuids, seqnos := reqTs.GetVbuuids(), reqTs.GetSeqnos()
for i, vbno := range vbnos {
snapshots := reqTs.GetSnapshots()
flags, vbuuid := uint32(0), vbuuids[i]
start, end := seqnos[i], uint64(0xFFFFFFFFFFFFFFFF)
snapStart, snapEnd := snapshots[i].GetStart(), snapshots[i].GetEnd()
e := bdcp.dcpFeed.DcpRequestStream(
vbno, opaque, flags, vbuuid, start, end, snapStart, snapEnd)
if e != nil {
err = e
}
// FIXME/TODO: the below sleep avoid back-to-back dispatch of
// StreamRequest to DCP, which seem to cause some problems.
time.Sleep(time.Millisecond)
}
return err
}
开发者ID:prataprc,项目名称:indexing,代码行数:27,代码来源:dcp.go
示例4: EndVbStreams
// EndVbStreams implements Feeder{} interface.
func (bdcp *bucketDcp) EndVbStreams(
opaque uint16, ts *protobuf.TsVbuuid) (err error) {
if bdcp.bucket != nil {
bdcp.bucket.Refresh()
}
vbnos := c.Vbno32to16(ts.GetVbnos())
for _, vbno := range vbnos {
if e := bdcp.dcpFeed.DcpCloseStream(vbno, opaque); e != nil {
err = e
}
}
return err
}
开发者ID:prataprc,项目名称:indexing,代码行数:15,代码来源:dcp.go
示例5: findTimestampOffsetForVb
//
// Find the offset/index in the timestamp for the given vbucket no. Return
// -1 if no matching vbno being found.
//
func findTimestampOffsetForVb(ts *protobuf.TsVbuuid, vbno uint32) int {
if ts == nil {
return -1
}
for i, ts_vbno := range ts.GetVbnos() {
if ts_vbno == vbno {
return i
}
}
return -1
}
开发者ID:jchris,项目名称:indexing,代码行数:18,代码来源:stream_admin.go
示例6: updateCurrentTsFromResponse
func updateCurrentTsFromResponse(bucket string,
currentTs *protobuf.TsVbuuid, res *protobuf.TimestampResponse) *protobuf.TsVbuuid {
currentTsList := res.GetCurrentTimestamps()
for _, ts := range currentTsList {
if ts != nil && !ts.IsEmpty() && ts.GetBucket() == bucket {
if currentTs == nil {
currentTs = ts.Clone()
} else {
currentTs = currentTs.Union(ts)
}
}
}
return currentTs
}
开发者ID:jchris,项目名称:indexing,代码行数:16,代码来源:kv_sender.go
示例7: updateActiveTsFromResponse
func updateActiveTsFromResponse(bucket string,
activeTs *protobuf.TsVbuuid, res *protobuf.TopicResponse) *protobuf.TsVbuuid {
activeTsList := res.GetActiveTimestamps()
for _, ts := range activeTsList {
if ts != nil && !ts.IsEmpty() && ts.GetBucket() == bucket {
if activeTs == nil {
activeTs = ts.Clone()
} else {
activeTs = activeTs.Union(ts)
}
}
}
return activeTs
}
开发者ID:jchris,项目名称:indexing,代码行数:16,代码来源:kv_sender.go
示例8: updateRollbackTsFromResponse
func updateRollbackTsFromResponse(bucket string,
rollbackTs *protobuf.TsVbuuid, res *protobuf.TopicResponse) *protobuf.TsVbuuid {
rollbackTsList := res.GetRollbackTimestamps()
for _, ts := range rollbackTsList {
if ts != nil && !ts.IsEmpty() && ts.GetBucket() == bucket {
if rollbackTs == nil {
rollbackTs = ts.Clone()
} else {
rollbackTs = rollbackTs.Union(ts)
}
}
}
return rollbackTs
}
开发者ID:jchris,项目名称:indexing,代码行数:17,代码来源:kv_sender.go
示例9: unmarshallTimestamp
func unmarshallTimestamp(str string) (*common.TsVbuuid, error) {
data, err := base64.StdEncoding.DecodeString(str)
if err != nil {
return nil, err
}
source := new(protobuf.TsVbuuid)
if err := proto.Unmarshal(data, source); err != nil {
return nil, err
}
target := common.NewTsVbuuid(source.GetBucket(), NUM_VB)
for _, vbno := range source.Vbnos {
target.Seqnos[vbno] = source.Seqnos[vbno]
target.Vbuuids[vbno] = source.Vbuuids[vbno]
}
return target, nil
}
开发者ID:jchris,项目名称:indexing,代码行数:21,代码来源:timer.go
示例10: recomputeRequestTimestamp
//
// Compute a new request timestamp based on the response from projector.
// If all the vb is active for the given requestTs, then this function returns nil.
//
func recomputeRequestTimestamp(requestTs *protobuf.TsVbuuid,
rollbackTimestamps []*protobuf.TsVbuuid) *protobuf.TsVbuuid {
newTs := protobuf.NewTsVbuuid(DEFAULT_POOL_NAME, requestTs.GetBucket(), len(requestTs.GetVbnos()))
rollbackTs := findTimestampForBucket(rollbackTimestamps, requestTs.GetBucket())
for i, vbno := range requestTs.GetVbnos() {
offset := findTimestampOffsetForVb(rollbackTs, vbno)
if offset != -1 {
// there is a failover Ts for this vbno. Use that one for retry.
newTs.Append(uint16(vbno), rollbackTs.Seqnos[offset], rollbackTs.Vbuuids[offset],
rollbackTs.Snapshots[offset].GetStart(), rollbackTs.Snapshots[offset].GetEnd())
} else {
// the vb is not active, just copy from the original requestTS
newTs.Append(uint16(vbno), requestTs.Seqnos[i], requestTs.Vbuuids[i],
requestTs.Snapshots[i].GetStart(), requestTs.Snapshots[i].GetEnd())
}
}
return newTs
}
开发者ID:jchris,项目名称:indexing,代码行数:25,代码来源:stream_admin.go
示例11: scatterMutation
func (kvdata *KVData) scatterMutation(
m *mc.DcpEvent, ts *protobuf.TsVbuuid) (err error) {
vbno := m.VBucket
switch m.Opcode {
case mcd.DCP_STREAMREQ:
if m.Status == mcd.ROLLBACK {
fmsg := "%v ##%x StreamRequest ROLLBACK: %v\n"
logging.Infof(fmsg, kvdata.logPrefix, m.Opaque, m)
} else if m.Status != mcd.SUCCESS {
fmsg := "%v ##%x StreamRequest %s: %v\n"
logging.Errorf(fmsg, kvdata.logPrefix, m.Opaque, m.Status, m)
} else if _, ok := kvdata.vrs[vbno]; ok {
fmsg := "%v ##%x duplicate OpStreamRequest: %v\n"
logging.Errorf(fmsg, kvdata.logPrefix, m.Opaque, m)
} else if m.VBuuid, _, err = m.FailoverLog.Latest(); err != nil {
panic(err)
} else {
fmsg := "%v ##%x StreamRequest: %v\n"
logging.Tracef(fmsg, kvdata.logPrefix, m.Opaque, m)
topic, bucket := kvdata.topic, kvdata.bucket
m.Seqno, _ = ts.SeqnoFor(vbno)
cluster, config := kvdata.feed.cluster, kvdata.config
vr := NewVbucketRoutine(
cluster, topic, bucket,
m.Opaque, vbno, m.VBuuid, m.Seqno, config)
_, err := vr.AddEngines(0xFFFF, kvdata.engines, kvdata.endpoints)
if err != nil {
panic(err)
}
if vr.Event(m) != nil {
panic(err)
}
kvdata.vrs[vbno] = vr
}
kvdata.feed.PostStreamRequest(kvdata.bucket, m)
case mcd.DCP_STREAMEND:
if vr, ok := kvdata.vrs[vbno]; !ok {
fmsg := "%v ##%x duplicate OpStreamEnd: %v\n"
logging.Errorf(fmsg, kvdata.logPrefix, m.Opaque, m)
} else if m.Status != mcd.SUCCESS {
fmsg := "%v ##%x StreamEnd %s: %v\n"
logging.Errorf(fmsg, kvdata.logPrefix, m.Opaque, m)
} else {
fmsg := "%v ##%x StreamEnd: %v\n"
logging.Tracef(fmsg, kvdata.logPrefix, m.Opaque, m)
if vr.Event(m) != nil {
panic(err)
}
delete(kvdata.vrs, vbno)
}
kvdata.feed.PostStreamEnd(kvdata.bucket, m)
case mcd.DCP_SNAPSHOT:
if vr, ok := kvdata.vrs[vbno]; ok && (vr.Event(m) != nil) {
panic(err)
} else if !ok {
fmsg := "%v ##%x unknown vbucket: %v\n"
logging.Fatalf(fmsg, kvdata.logPrefix, m.Opaque, m)
}
case mcd.DCP_MUTATION, mcd.DCP_DELETION, mcd.DCP_EXPIRATION:
if vr, ok := kvdata.vrs[vbno]; ok && (vr.Event(m) != nil) {
panic(err)
} else if !ok {
fmsg := "%v ##%x unknown vbucket: %v\n"
logging.Fatalf(fmsg, kvdata.logPrefix, m.Opaque, m)
}
}
return
}
开发者ID:jchris,项目名称:indexing,代码行数:79,代码来源:kvdata.go
示例12: runScatter
// go-routine handles data path.
func (kvdata *KVData) runScatter(
ts *protobuf.TsVbuuid, mutch <-chan *mc.DcpEvent) {
// NOTE: panic will bubble up from vbucket-routine to kvdata.
defer func() {
if r := recover(); r != nil {
fmsg := "%v ##%x runScatter() crashed: %v\n"
logging.Errorf(fmsg, kvdata.logPrefix, kvdata.opaque, r)
logging.Errorf("%s", logging.StackTrace())
}
kvdata.publishStreamEnd()
kvdata.feed.PostFinKVdata(kvdata.bucket)
close(kvdata.finch)
logging.Infof("%v ##%x ... stopped\n", kvdata.logPrefix, kvdata.opaque)
}()
// stats
eventCount, addCount, delCount := int64(0), int64(0), int64(0)
tsCount := int64(0)
heartBeat := time.After(kvdata.syncTimeout)
fmsg := "%v ##%x heartbeat (%v) loaded ...\n"
logging.Infof(fmsg, kvdata.logPrefix, kvdata.opaque, kvdata.syncTimeout)
loop:
for {
select {
case m, ok := <-mutch:
if ok == false { // upstream has closed
break loop
}
kvdata.scatterMutation(m, ts)
eventCount++
case <-heartBeat:
vrs := make([]*VbucketRoutine, 0, len(kvdata.vrs))
for _, vr := range kvdata.vrs {
vrs = append(vrs, vr)
}
heartBeat = nil
// propogate the sync-pulse via separate routine so that
// the data-path is not blocked.
go func() {
// during cleanup, as long as the vbucket-routines are
// shutdown this routine will eventually exit.
for _, vr := range vrs {
vr.SyncPulse()
}
if err := kvdata.ReloadHeartbeat(); err != nil {
fmsg := "%v ##%x ReloadHeartbeat(): %v\n"
logging.Errorf(fmsg, kvdata.logPrefix, kvdata.opaque, err)
}
}()
case msg := <-kvdata.sbch:
cmd := msg[0].(byte)
switch cmd {
case kvCmdAddEngines:
opaque := msg[1].(uint16)
respch := msg[4].(chan []interface{})
if msg[2] != nil {
for uuid, engine := range msg[2].(map[uint64]*Engine) {
if _, ok := kvdata.engines[uuid]; !ok {
fmsg := "%v ##%x new engine added %v"
logging.Infof(fmsg, kvdata.logPrefix, opaque, uuid)
}
kvdata.engines[uuid] = engine
}
}
if msg[3] != nil {
rv := msg[3].(map[string]c.RouterEndpoint)
for raddr, endp := range rv {
fmsg := "%v ##%x updated endpoint %q"
logging.Infof(fmsg, kvdata.logPrefix, opaque, raddr)
kvdata.endpoints[raddr] = endp
}
}
curSeqnos := make(map[uint16]uint64)
if kvdata.engines != nil || kvdata.endpoints != nil {
engines, endpoints := kvdata.engines, kvdata.endpoints
for _, vr := range kvdata.vrs {
curSeqno, err := vr.AddEngines(opaque, engines, endpoints)
if err != nil {
panic(err)
}
curSeqnos[vr.vbno] = curSeqno
}
}
addCount++
respch <- []interface{}{curSeqnos, nil}
case kvCmdDelEngines:
opaque := msg[1].(uint16)
engineKeys := msg[2].([]uint64)
respch := msg[3].(chan []interface{})
for _, vr := range kvdata.vrs {
if err := vr.DeleteEngines(opaque, engineKeys); err != nil {
panic(err)
//.........这里部分代码省略.........
开发者ID:jchris,项目名称:indexing,代码行数:101,代码来源:kvdata.go
示例13: sendRestartVbuckets
func (k *kvSender) sendRestartVbuckets(ap *projClient.Client,
topic string, connErrVbs []Vbucket,
restartTs *protobuf.TsVbuuid) (*protobuf.TopicResponse, error) {
logging.Infof("KVSender::sendRestartVbuckets Projector %v Topic %v %v", ap, topic, restartTs.GetBucket())
logging.LazyVerbosef("KVSender::sendRestartVbuckets RestartTs %v", restartTs.Repr)
//Shutdown the vbucket before restart if there was a ConnErr. If the vbucket is already
//running, projector will ignore the request otherwise
if len(connErrVbs) != 0 {
logging.Infof("KVSender::sendRestartVbuckets ShutdownVbuckets %v Topic %v %v ConnErrVbs %v",
ap, topic, restartTs.GetBucket(), connErrVbs)
// Only shutting down the Vb that receieve connection error. It is probably not harmful
// to shutdown every VB in the repairTS, including those that only receive StreamEnd.
// But due to network / projecctor latency, a VB StreamBegin may be coming on the way
// for those VB (especially when RepairStream has already retried a couple of times).
// So shutting all VB in restartTs may unnecessarily causing race condition and
// make the protocol longer to converge. ShutdownVbuckets should have no effect on
// projector that does not own the Vb.
shutdownTs := k.computeShutdownTs(restartTs, connErrVbs)
logging.Infof("KVSender::sendRestartVbuckets ShutdownVbuckets Projector %v Topic %v %v \n\tShutdownTs %v",
ap, topic, restartTs.GetBucket(), shutdownTs.Repr())
if err := ap.ShutdownVbuckets(topic, []*protobuf.TsVbuuid{shutdownTs}); err != nil {
logging.Errorf("KVSender::sendRestartVbuckets Unexpected Error During "+
"ShutdownVbuckets Request for Projector %v Topic %v. Err %v.", ap,
topic, err)
//all shutdownVbuckets errors are treated as success as it is a best-effort call.
//RestartVbuckets errors will be acted upon.
}
}
if res, err := ap.RestartVbuckets(topic, []*protobuf.TsVbuuid{restartTs}); err != nil {
logging.Fatalf("KVSender::sendRestartVbuckets Unexpected Error During "+
"Restart Vbuckets Request for Projector %v Topic %v %v . Err %v.", ap,
topic, restartTs.GetBucket(), err)
return res, err
} else {
logging.Infof("KVSender::sendRestartVbuckets Success Projector %v Topic %v %v", ap, topic, restartTs.GetBucket())
if logging.IsEnabled(logging.Verbose) {
logging.Verbosef("KVSender::sendRestartVbuckets \nActiveTs %v \nRollbackTs %v",
debugPrintTs(res.GetActiveTimestamps(), restartTs.GetBucket()),
debugPrintTs(res.GetRollbackTimestamps(), restartTs.GetBucket()))
}
return res, nil
}
}
开发者ID:jchris,项目名称:indexing,代码行数:52,代码来源:kv_sender.go
示例14: addIndexForExistingBucket
func (k *kvSender) addIndexForExistingBucket(streamId c.StreamId, bucket string, indexInstList []c.IndexInst,
respCh MsgChannel, stopCh StopChannel) {
addrs, err := k.getAllProjectorAddrs()
if err != nil {
logging.Errorf("KVSender::addIndexForExistingBucket %v %v Error in fetching cluster info %v",
streamId, bucket, err)
respCh <- &MsgError{
err: Error{code: ERROR_KVSENDER_STREAM_REQUEST_ERROR,
severity: FATAL,
cause: err}}
return
}
var currentTs *protobuf.TsVbuuid
protoInstList := convertIndexListToProto(k.config, k.cInfoCache, indexInstList, streamId)
topic := getTopicForStreamId(streamId)
fn := func(r int, err error) error {
//clear the error before every retry
err = nil
for _, addr := range addrs {
execWithStopCh(func() {
ap := newProjClient(addr)
if res, ret := sendAddInstancesRequest(ap, topic, protoInstList); ret != nil {
logging.Errorf("KVSender::addIndexForExistingBucket %v %v Error Received %v from %v",
streamId, bucket, ret, addr)
err = ret
} else {
currentTs = updateCurrentTsFromResponse(bucket, currentTs, res)
}
}, stopCh)
}
//check if we have received currentTs for all vbuckets
numVbuckets := k.config["numVbuckets"].Int()
if currentTs == nil || currentTs.Len() != numVbuckets {
return errors.New("ErrPartialVbStart")
} else {
return err
}
}
rh := c.NewRetryHelper(MAX_KV_REQUEST_RETRY, time.Second, BACKOFF_FACTOR, fn)
err = rh.Run()
if err != nil {
logging.Errorf("KVSender::addIndexForExistingBucket %v %v Error Received %v",
streamId, bucket, err)
respCh <- &MsgError{
err: Error{code: ERROR_KVSENDER_STREAM_REQUEST_ERROR,
severity: FATAL,
cause: err}}
return
}
numVbuckets := k.config["numVbuckets"].Int()
nativeTs := currentTs.ToTsVbuuid(numVbuckets)
respCh <- &MsgStreamUpdate{mType: MSG_SUCCESS,
streamId: streamId,
bucket: bucket,
restartTs: nativeTs}
}
开发者ID:jchris,项目名称:indexing,代码行数:65,代码来源:kv_sender.go
示例15: restartVbuckets
func (k *kvSender) restartVbuckets(streamId c.StreamId, restartTs *c.TsVbuuid,
connErrVbs []Vbucket, respCh MsgChannel, stopCh StopChannel) {
addrs, err := k.getProjAddrsForVbuckets(restartTs.Bucket, restartTs.GetVbnos())
if err != nil {
logging.Errorf("KVSender::restartVbuckets %v %v Error in fetching cluster info %v",
streamId, restartTs.Bucket, err)
respCh <- &MsgError{
err: Error{code: ERROR_KVSENDER_STREAM_REQUEST_ERROR,
severity: FATAL,
cause: err}}
return
}
//convert TS to protobuf format
var protoRestartTs *protobuf.TsVbuuid
numVbuckets := k.config["numVbuckets"].Int()
protoTs := protobuf.NewTsVbuuid(DEFAULT_POOL, restartTs.Bucket, numVbuckets)
protoRestartTs = protoTs.FromTsVbuuid(restartTs)
var rollbackTs *protobuf.TsVbuuid
topic := getTopicForStreamId(streamId)
rollback := false
fn := func(r int, err error) error {
for _, addr := range addrs {
ap := newProjClient(addr)
if res, ret := k.sendRestartVbuckets(ap, topic, connErrVbs, protoRestartTs); ret != nil {
//retry for all errors
logging.Errorf("KVSender::restartVbuckets %v %v Error Received %v from %v",
streamId, restartTs.Bucket, ret, addr)
err = ret
} else {
rollbackTs = updateRollbackTsFromResponse(restartTs.Bucket, rollbackTs, res)
}
}
if rollbackTs != nil && checkVbListInTS(protoRestartTs.GetVbnos(), rollbackTs) {
//if rollback, no need to retry
rollback = true
return nil
} else {
return err
}
}
rh := c.NewRetryHelper(MAX_KV_REQUEST_RETRY, time.Second, BACKOFF_FACTOR, fn)
err = rh.Run()
//if any of the requested vb is in rollback ts, send rollback
//msg to caller
if rollback {
//convert from protobuf to native format
nativeTs := rollbackTs.ToTsVbuuid(numVbuckets)
respCh <- &MsgRollback{streamId: streamId,
rollbackTs: nativeTs}
} else if err != nil {
//if there is a topicMissing/genServer.Closed error, a fresh
//MutationTopicRequest is required.
if err.Error() == projClient.ErrorTopicMissing.Error() ||
err.Error() == c.ErrorClosed.Error() ||
err.Error() == projClient.ErrorInvalidBucket.Error() {
respCh <- &MsgKVStreamRepair{
streamId: streamId,
bucket: restartTs.Bucket,
}
} else {
respCh <- &MsgError{
err: Error{code: ERROR_KVSENDER_STREAM_REQUEST_ERROR,
severity: FATAL,
cause: err}}
}
} else {
respCh <- &MsgSuccess{}
}
}
开发者ID:jchris,项目名称:indexing,代码行数:81,代码来源:kv_sender.go
示例16: openMutationStream
func (k *kvSender) openMutationStream(streamId c.StreamId, indexInstList []c.IndexInst,
restartTs *c.TsVbuuid, respCh MsgChannel, stopCh StopChannel) {
if len(indexInstList) == 0 {
logging.Warnf("KVSender::openMutationStream Empty IndexList. Nothing to do.")
respCh <- &MsgSuccess{}
return
}
protoInstList := convertIndexListToProto(k.config, k.cInfoCache, indexInstList, streamId)
bucket := indexInstList[0].Defn.Bucket
//use any bucket as list of vbs remain the same for all buckets
vbnos, err := k.getAllVbucketsInCluster(bucket)
if err != nil {
logging.Errorf("KVSender::openMutationStream %v %v Error in fetching vbuckets info %v",
streamId, restartTs.Bucket, err)
respCh <- &MsgError{
err: Error{code: ERROR_KVSENDER_STREAM_REQUEST_ERROR,
severity: FATAL,
cause: err}}
return
}
restartTsList, err := k.makeRestartTsForVbs(bucket, restartTs, vbnos)
if err != nil {
logging.Errorf("KVSender::openMutationStream %v %v Error making restart ts %v",
streamId, bucket, err)
respCh <- &MsgError{
err: Error{code: ERROR_KVSENDER_STREAM_REQUEST_ERROR,
severity: FATAL,
cause: err}}
return
}
addrs, err := k.getAllProjectorAddrs()
if err != nil {
logging.Errorf("KVSender::openMutationStream %v %v Error Fetching Projector Addrs %v",
streamId, bucket, err)
respCh <- &MsgError{
err: Error{code: ERROR_KVSENDER_STREAM_REQUEST_ERROR,
severity: FATAL,
cause: err}}
return
}
var rollbackTs *protobuf.TsVbuuid
var activeTs *protobuf.TsVbuuid
topic := getTopicForStreamId(streamId)
fn := func(r int, err error) error {
//clear the error before every retry
err = nil
for _, addr := range addrs {
execWithStopCh(func() {
ap := newProjClient(addr)
if res, ret := k.sendMutationTopicRequest(ap, topic, restartTsList, protoInstList); ret != nil {
//for all errors, retry
logging.Errorf("KVSender::openMutationStream %v %v Error Received %v from %v",
streamId, bucket, ret, addr)
err = ret
} else {
activeTs = updateActiveTsFromResponse(bucket, activeTs, res)
rollbackTs = updateRollbackTsFromResponse(bucket, rollbackTs, res)
}
}, stopCh)
}
if rollbackTs != nil {
//no retry required for rollback
return nil
} else if err != nil {
//retry for any error
return err
} else {
//check if we have received activeTs for all vbuckets
retry := false
if activeTs == nil || activeTs.Len() != len(vbnos) {
retry = true
}
if retry {
return errors.New("ErrPartialVbStart")
} else {
return nil
}
}
}
rh := c.NewRetryHelper(MAX_KV_REQUEST_RETRY, time.Second, BACKOFF_FACTOR, fn)
err = rh.Run()
if rollbackTs != nil {
logging.Infof("KVSender::openMutationStream %v %v Rollback Received %v",
streamId, bucket, rollbackTs)
//convert from protobuf to native format
numVbuckets := k.config["numVbuckets"].Int()
//.........这里部分代码省略.........
开发者ID:jchris,项目名称:indexing,代码行数:101,代码来源:kv_sender.go
注:本文中的github.com/couchbase/indexing/secondary/protobuf/projector.TsVbuuid类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论