本文整理汇总了Golang中github.com/cockroachdb/cockroach/client.SenderOpt函数的典型用法代码示例。如果您正苦于以下问题:Golang SenderOpt函数的具体用法?Golang SenderOpt怎么用?Golang SenderOpt使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了SenderOpt函数的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。
示例1: BootstrapCluster
// BootstrapCluster bootstraps a multiple stores using the provided engines and
// cluster ID. The first bootstrapped store contains a single range spanning
// all keys. Initial range lookup metadata is populated for the range.
//
// Returns a KV client for unittest purposes. Caller should close the returned
// client.
func BootstrapCluster(clusterID string, engines []engine.Engine, stopper *stop.Stopper) (*client.DB, error) {
ctx := storage.StoreContext{}
ctx.ScanInterval = 10 * time.Minute
ctx.Clock = hlc.NewClock(hlc.UnixNano)
// Create a KV DB with a local sender.
lSender := kv.NewLocalSender()
sender := kv.NewTxnCoordSender(lSender, ctx.Clock, false, nil, stopper)
var err error
if ctx.DB, err = client.Open("//[email protected]", client.SenderOpt(sender)); err != nil {
return nil, err
}
ctx.Transport = multiraft.NewLocalRPCTransport(stopper)
for i, eng := range engines {
sIdent := proto.StoreIdent{
ClusterID: clusterID,
NodeID: 1,
StoreID: proto.StoreID(i + 1),
}
// The bootstrapping store will not connect to other nodes so its
// StoreConfig doesn't really matter.
s := storage.NewStore(ctx, eng, &proto.NodeDescriptor{NodeID: 1})
// Verify the store isn't already part of a cluster.
if len(s.Ident.ClusterID) > 0 {
return nil, util.Errorf("storage engine already belongs to a cluster (%s)", s.Ident.ClusterID)
}
// Bootstrap store to persist the store ident.
if err := s.Bootstrap(sIdent, stopper); err != nil {
return nil, err
}
// Create first range, writing directly to engine. Note this does
// not create the range, just its data. Only do this if this is the
// first store.
if i == 0 {
if err := s.BootstrapRange(); err != nil {
return nil, err
}
}
if err := s.Start(stopper); err != nil {
return nil, err
}
lSender.AddStore(s)
// Initialize node and store ids. Only initialize the node once.
if i == 0 {
if nodeID, err := allocateNodeID(ctx.DB); nodeID != sIdent.NodeID || err != nil {
return nil, util.Errorf("expected to initialize node id allocator to %d, got %d: %s",
sIdent.NodeID, nodeID, err)
}
}
if storeID, err := allocateStoreIDs(sIdent.NodeID, 1, ctx.DB); storeID != sIdent.StoreID || err != nil {
return nil, util.Errorf("expected to initialize store id allocator to %d, got %d: %s",
sIdent.StoreID, storeID, err)
}
}
return ctx.DB, nil
}
开发者ID:arypurnomoz,项目名称:cockroach,代码行数:66,代码来源:node.go
示例2: createTestNode
// createTestNode creates an rpc server using the specified address,
// gossip instance, KV database and a node using the specified slice
// of engines. The server, clock and node are returned. If gossipBS is
// not nil, the gossip bootstrap address is set to gossipBS.
func createTestNode(addr net.Addr, engines []engine.Engine, gossipBS net.Addr, t *testing.T) (
*rpc.Server, *hlc.Clock, *Node, *stop.Stopper) {
var err error
ctx := storage.StoreContext{}
stopper := stop.NewStopper()
ctx.Clock = hlc.NewClock(hlc.UnixNano)
nodeRPCContext := rpc.NewContext(nodeTestBaseContext, ctx.Clock, stopper)
ctx.ScanInterval = 10 * time.Hour
rpcServer := rpc.NewServer(addr, nodeRPCContext)
if err := rpcServer.Start(); err != nil {
t.Fatal(err)
}
g := gossip.New(nodeRPCContext, testContext.GossipInterval, testContext.GossipBootstrapResolvers)
if gossipBS != nil {
// Handle possibility of a :0 port specification.
if gossipBS == addr {
gossipBS = rpcServer.Addr()
}
g.SetResolvers([]resolver.Resolver{resolver.NewResolverFromAddress(gossipBS)})
g.Start(rpcServer, stopper)
}
ctx.Gossip = g
sender := kv.NewDistSender(&kv.DistSenderContext{Clock: ctx.Clock}, g)
if ctx.DB, err = client.Open("//[email protected]", client.SenderOpt(sender)); err != nil {
t.Fatal(err)
}
// TODO(bdarnell): arrange to have the transport closed.
// (or attach LocalRPCTransport.Close to the stopper)
ctx.Transport = multiraft.NewLocalRPCTransport(stopper)
ctx.EventFeed = util.NewFeed(stopper)
node := NewNode(ctx)
return rpcServer, ctx.Clock, node, stopper
}
开发者ID:ErikGrimes,项目名称:cockroach,代码行数:38,代码来源:node_test.go
示例3: createTestStoreWithoutStart
// createTestStoreWithoutStart creates a test store using an in-memory
// engine without starting the store. It returns the store, the store
// clock's manual unix nanos time and a stopper. The caller is
// responsible for stopping the stopper upon completion.
func createTestStoreWithoutStart(t *testing.T) (*Store, *hlc.ManualClock, *stop.Stopper) {
stopper := stop.NewStopper()
rpcContext := rpc.NewContext(rootTestBaseContext, hlc.NewClock(hlc.UnixNano), stopper)
ctx := TestStoreContext
ctx.Gossip = gossip.New(rpcContext, gossip.TestInterval, gossip.TestBootstrap)
manual := hlc.NewManualClock(0)
ctx.Clock = hlc.NewClock(manual.UnixNano)
eng := engine.NewInMem(proto.Attributes{}, 10<<20)
ctx.Transport = multiraft.NewLocalRPCTransport()
stopper.AddCloser(ctx.Transport)
sender := &testSender{}
var err error
if ctx.DB, err = client.Open("//[email protected]", client.SenderOpt(sender)); err != nil {
t.Fatal(err)
}
store := NewStore(ctx, eng, &proto.NodeDescriptor{NodeID: 1})
sender.store = store
if err := store.Bootstrap(proto.StoreIdent{NodeID: 1, StoreID: 1}, stopper); err != nil {
t.Fatal(err)
}
if err := store.BootstrapRange(); err != nil {
t.Fatal(err)
}
return store, manual, stopper
}
开发者ID:backend2use,项目名称:cockroachdb,代码行数:29,代码来源:store_test.go
示例4: Start
// Start starts the test cluster by bootstrapping an in-memory store
// (defaults to maximum of 50M). The server is started, launching the
// node RPC server and all HTTP endpoints. Use the value of
// TestServer.Addr after Start() for client connections. Use Stop()
// to shutdown the server after the test completes.
func (ltc *LocalTestCluster) Start(t util.Tester) {
ltc.Manual = hlc.NewManualClock(0)
ltc.Clock = hlc.NewClock(ltc.Manual.UnixNano)
ltc.Stopper = stop.NewStopper()
rpcContext := rpc.NewContext(testutils.NewRootTestBaseContext(), ltc.Clock, ltc.Stopper)
ltc.Gossip = gossip.New(rpcContext, gossip.TestInterval, gossip.TestBootstrap)
ltc.Eng = engine.NewInMem(proto.Attributes{}, 50<<20)
ltc.lSender = newRetryableLocalSender(NewLocalSender())
ltc.Sender = NewTxnCoordSender(ltc.lSender, ltc.Clock, false, nil, ltc.Stopper)
var err error
if ltc.DB, err = client.Open("//[email protected]", client.SenderOpt(ltc.Sender)); err != nil {
t.Fatal(err)
}
transport := multiraft.NewLocalRPCTransport(ltc.Stopper)
ltc.Stopper.AddCloser(transport)
ctx := storage.TestStoreContext
ctx.Clock = ltc.Clock
ctx.DB = ltc.DB
ctx.Gossip = ltc.Gossip
ctx.Transport = transport
ltc.Store = storage.NewStore(ctx, ltc.Eng, &proto.NodeDescriptor{NodeID: 1})
if err := ltc.Store.Bootstrap(proto.StoreIdent{NodeID: 1, StoreID: 1}, ltc.Stopper); err != nil {
t.Fatalf("unable to start local test cluster: %s", err)
}
ltc.lSender.AddStore(ltc.Store)
if err := ltc.Store.BootstrapRange(nil); err != nil {
t.Fatalf("unable to start local test cluster: %s", err)
}
if err := ltc.Store.Start(ltc.Stopper); err != nil {
t.Fatalf("unable to start local test cluster: %s", err)
}
}
开发者ID:knorwood,项目名称:cockroach,代码行数:37,代码来源:local_test_cluster.go
示例5: Start
func (m *multiTestContext) Start(t *testing.T, numStores int) {
m.t = t
if m.manualClock == nil {
m.manualClock = hlc.NewManualClock(0)
}
if m.clock == nil {
m.clock = hlc.NewClock(m.manualClock.UnixNano)
}
if m.gossip == nil {
rpcContext := rpc.NewContext(&base.Context{}, m.clock, nil)
m.gossip = gossip.New(rpcContext, gossip.TestInterval, gossip.TestBootstrap)
}
if m.clientStopper == nil {
m.clientStopper = stop.NewStopper()
}
if m.transport == nil {
m.transport = multiraft.NewLocalRPCTransport(m.clientStopper)
}
if m.storePool == nil {
m.storePool = storage.NewStorePool(m.gossip, storage.TestTimeUntilStoreDeadOff, m.clientStopper)
}
// Always create the first sender.
m.senders = append(m.senders, kv.NewLocalSender())
if m.db == nil {
sender := kv.NewTxnCoordSender(m.senders[0], m.clock, false, nil, m.clientStopper)
var err error
if m.db, err = client.Open("//", client.SenderOpt(sender)); err != nil {
t.Fatal(err)
}
}
for i := 0; i < numStores; i++ {
m.addStore()
}
if m.transportStopper == nil {
m.transportStopper = stop.NewStopper()
}
m.transportStopper.AddCloser(m.transport)
}
开发者ID:snehasis419,项目名称:cockroach,代码行数:41,代码来源:client_test.go
示例6: createTestStoreWithEngine
// createTestStoreWithEngine creates a test store using the given engine and clock.
// The caller is responsible for closing the store on exit.
func createTestStoreWithEngine(t *testing.T, eng engine.Engine, clock *hlc.Clock,
bootstrap bool, context *storage.StoreContext) (*storage.Store, *stop.Stopper) {
stopper := stop.NewStopper()
rpcContext := rpc.NewContext(&base.Context{}, hlc.NewClock(hlc.UnixNano), stopper)
if context == nil {
// make a copy
ctx := storage.TestStoreContext
context = &ctx
}
context.Gossip = gossip.New(rpcContext, gossip.TestInterval, gossip.TestBootstrap)
lSender := kv.NewLocalSender()
sender := kv.NewTxnCoordSender(lSender, clock, false, nil, stopper)
context.Clock = clock
var err error
if context.DB, err = client.Open("//", client.SenderOpt(sender)); err != nil {
t.Fatal(err)
}
context.Transport = multiraft.NewLocalRPCTransport(stopper)
// TODO(bdarnell): arrange to have the transport closed.
store := storage.NewStore(*context, eng, &proto.NodeDescriptor{NodeID: 1})
if bootstrap {
if err := store.Bootstrap(proto.StoreIdent{NodeID: 1, StoreID: 1}, stopper); err != nil {
t.Fatal(err)
}
}
lSender.AddStore(store)
if bootstrap {
if err := store.BootstrapRange(nil); err != nil {
t.Fatal(err)
}
}
if err := store.Start(stopper); err != nil {
t.Fatal(err)
}
return store, stopper
}
开发者ID:Eric-Gaudiello,项目名称:cockroach,代码行数:38,代码来源:client_test.go
示例7: TestIDAllocationRetry
func TestIDAllocationRetry(t *testing.T) {
defer leaktest.AfterTest(t)
defer setAllocRetryBackoff(0)()
i := 98
sender := func(_ context.Context, c proto.Call) {
if i%2 == 0 {
c.Reply.Header().Error = &proto.Error{Retryable: true}
}
c.Reply.(*proto.IncrementResponse).NewValue = int64(i)
i++
}
db, err := client.Open("//[email protected]", client.SenderOpt(testSender(sender)))
if err != nil {
t.Fatal(err)
}
if n, err := allocateNodeID(db); n != 99 || err != nil {
t.Fatalf("wanted NodeID 99, got %d (err=%s)", n, err)
}
if n, err := allocateStoreIDs(1, 1, db); n != 101 || err != nil {
t.Fatalf("wanted NodeID 101, got %d (err=%s)", n, err)
}
}
开发者ID:huaxling,项目名称:cockroach,代码行数:24,代码来源:node_test.go
示例8: NewServer
// NewServer creates a Server from a server.Context.
func NewServer(ctx *Context, stopper *stop.Stopper) (*Server, error) {
if ctx == nil {
return nil, util.Error("ctx must not be null")
}
addr := ctx.Addr
_, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, util.Errorf("unable to resolve RPC address %q: %v", addr, err)
}
if ctx.Insecure {
log.Warning("running in insecure mode, this is strongly discouraged. See --insecure and --certs.")
}
// Try loading the TLS configs before anything else.
if _, err := ctx.GetServerTLSConfig(); err != nil {
return nil, err
}
if _, err := ctx.GetClientTLSConfig(); err != nil {
return nil, err
}
s := &Server{
ctx: ctx,
mux: http.NewServeMux(),
clock: hlc.NewClock(hlc.UnixNano),
stopper: stopper,
}
s.clock.SetMaxOffset(ctx.MaxOffset)
rpcContext := rpc.NewContext(&ctx.Context, s.clock, stopper)
stopper.RunWorker(func() {
rpcContext.RemoteClocks.MonitorRemoteOffsets(stopper)
})
s.rpc = rpc.NewServer(util.MakeUnresolvedAddr("tcp", addr), rpcContext)
s.stopper.AddCloser(s.rpc)
s.gossip = gossip.New(rpcContext, s.ctx.GossipInterval, s.ctx.GossipBootstrapResolvers)
feed := util.NewFeed(stopper)
tracer := tracer.NewTracer(feed, addr)
ds := kv.NewDistSender(&kv.DistSenderContext{Clock: s.clock}, s.gossip)
sender := kv.NewTxnCoordSender(ds, s.clock, ctx.Linearizable, tracer, s.stopper)
if s.db, err = client.Open("//[email protected]", client.SenderOpt(sender)); err != nil {
return nil, err
}
s.raftTransport, err = newRPCTransport(s.gossip, s.rpc, rpcContext)
if err != nil {
return nil, err
}
s.stopper.AddCloser(s.raftTransport)
s.kvDB = kv.NewDBServer(&s.ctx.Context, sender)
if s.ctx.ExperimentalRPCServer {
if err = s.kvDB.RegisterRPC(s.rpc); err != nil {
return nil, err
}
}
s.sqlServer = sql.NewServer(&s.ctx.Context, s.db)
// TODO(bdarnell): make StoreConfig configurable.
nCtx := storage.StoreContext{
Clock: s.clock,
DB: s.db,
Gossip: s.gossip,
Transport: s.raftTransport,
ScanInterval: s.ctx.ScanInterval,
ScanMaxIdleTime: s.ctx.ScanMaxIdleTime,
EventFeed: feed,
Tracer: tracer,
}
s.node = NewNode(nCtx)
s.admin = newAdminServer(s.db, s.stopper)
s.status = newStatusServer(s.db, s.gossip, ctx)
s.tsDB = ts.NewDB(s.db)
s.tsServer = ts.NewServer(s.tsDB)
return s, nil
}
开发者ID:nkhuyu,项目名称:cockroach,代码行数:83,代码来源:server.go
示例9: sendOne
//.........这里部分代码省略.........
trace.Event("coordinator spawns")
txnMeta = &txnMetadata{
txn: *txn,
keys: cache.NewIntervalCache(cache.Config{Policy: cache.CacheNone}),
firstUpdateNanos: tc.clock.PhysicalNow(),
lastUpdateNanos: tc.clock.PhysicalNow(),
timeoutDuration: tc.clientTimeout,
txnEnd: make(chan struct{}),
}
tc.txns[id] = txnMeta
if !tc.stopper.RunAsyncTask(func() {
tc.heartbeatLoop(id)
}) {
// The system is already draining and we can't start the
// heartbeat. We refuse new transactions for now because
// they're likely not going to have all intents committed.
// In principle, we can relax this as needed though.
call.Reply.Header().SetGoError(&proto.NodeUnavailableError{})
tc.Unlock()
tc.unregisterTxn(id)
return
}
}
txnMeta.addKeyRange(header.Key, header.EndKey)
}
// Update our record of this transaction.
if txnMeta != nil {
txnMeta.txn = *txn
txnMeta.setLastUpdate(tc.clock.PhysicalNow())
}
}
tc.Unlock()
}
// Cleanup intents and transaction map if end of transaction.
switch t := call.Reply.Header().GoError().(type) {
case *proto.TransactionStatusError:
// Likely already committed or more obscure errors such as epoch or
// timestamp regressions; consider it dead.
tc.cleanupTxn(trace, t.Txn)
case *proto.TransactionAbortedError:
// If already aborted, cleanup the txn on this TxnCoordSender.
tc.cleanupTxn(trace, t.Txn)
case *proto.OpRequiresTxnError:
// Run a one-off transaction with that single command.
if log.V(1) {
log.Infof("%s: auto-wrapping in txn and re-executing", call.Method())
}
// TODO(tschottdorf): this part is awkward. Consider resending here
// without starting a new call, which is hard to trace. Plus, the
// below depends on default configuration.
tmpDB, err := client.Open(
fmt.Sprintf("//%s?priority=%d",
call.Args.Header().User, call.Args.Header().GetUserPriority()),
client.SenderOpt(tc))
if err != nil {
log.Warning(err)
return
}
call.Reply.Reset()
if err := tmpDB.Txn(func(txn *client.Txn) error {
txn.SetDebugName("auto-wrap", 0)
b := &client.Batch{}
b.InternalAddCall(call)
return txn.CommitInBatch(b)
}); err != nil {
log.Warning(err)
}
case nil:
if txn := call.Reply.Header().Txn; txn != nil {
if _, ok := call.Args.(*proto.EndTransactionRequest); ok {
// If the --linearizable flag is set, we want to make sure that
// all the clocks in the system are past the commit timestamp
// of the transaction. This is guaranteed if either
// - the commit timestamp is MaxOffset behind startNS
// - MaxOffset ns were spent in this function
// when returning to the client. Below we choose the option
// that involves less waiting, which is likely the first one
// unless a transaction commits with an odd timestamp.
if tsNS := txn.Timestamp.WallTime; startNS > tsNS {
startNS = tsNS
}
sleepNS := tc.clock.MaxOffset() -
time.Duration(tc.clock.PhysicalNow()-startNS)
if tc.linearizable && sleepNS > 0 {
defer func() {
if log.V(1) {
log.Infof("%v: waiting %s on EndTransaction for linearizability", txn.Short(), util.TruncateDuration(sleepNS, time.Millisecond))
}
time.Sleep(sleepNS)
}()
}
if txn.Status != proto.PENDING {
tc.cleanupTxn(trace, *txn)
}
}
}
}
}
开发者ID:knorwood,项目名称:cockroach,代码行数:101,代码来源:txn_coord_sender.go
示例10: sendOne
// sendOne sends a single call via the wrapped sender. If the call is
// part of a transaction, the TxnCoordSender adds the transaction to a
// map of active transactions and begins heartbeating it. Every
// subsequent call for the same transaction updates the lastUpdate
// timestamp to prevent live transactions from being considered
// abandoned and garbage collected. Read/write mutating requests have
// their key or key range added to the transaction's interval tree of
// key ranges for eventual cleanup via resolved write intents.
//
// On success, and if the call is part of a transaction, the affected
// key range is recorded as live intents for eventual cleanup upon
// transaction commit. Upon successful txn commit, initiates cleanup
// of intents.
func (tc *TxnCoordSender) sendOne(call proto.Call) {
var startNS int64
header := call.Args.Header()
// If this call is part of a transaction...
if header.Txn != nil {
// Set the timestamp to the original timestamp for read-only
// commands and to the transaction timestamp for read/write
// commands.
if proto.IsReadOnly(call.Args) {
header.Timestamp = header.Txn.OrigTimestamp
} else {
header.Timestamp = header.Txn.Timestamp
}
// EndTransaction must have its key set to that of the txn.
if _, ok := call.Args.(*proto.EndTransactionRequest); ok {
header.Key = header.Txn.Key
// Remember when EndTransaction started in case we want to
// be linearizable.
startNS = tc.clock.PhysicalNow()
}
}
// Send the command through wrapped sender.
tc.wrapped.Send(context.TODO(), call)
if header.Txn != nil {
// If not already set, copy the request txn.
if call.Reply.Header().Txn == nil {
call.Reply.Header().Txn = gogoproto.Clone(header.Txn).(*proto.Transaction)
}
tc.updateResponseTxn(header, call.Reply.Header())
}
if txn := call.Reply.Header().Txn; txn != nil {
tc.Lock()
txnMeta := tc.txns[string(txn.ID)]
// If this transactional command leaves transactional intents, add the key
// or key range to the intents map. If the transaction metadata doesn't yet
// exist, create it.
if call.Reply.Header().GoError() == nil {
if proto.IsTransactionWrite(call.Args) {
if txnMeta == nil {
txnMeta = &txnMetadata{
txn: *txn,
keys: cache.NewIntervalCache(cache.Config{Policy: cache.CacheNone}),
firstUpdateNanos: tc.clock.PhysicalNow(),
lastUpdateNanos: tc.clock.PhysicalNow(),
timeoutDuration: tc.clientTimeout,
txnEnd: make(chan struct{}),
}
id := string(txn.ID)
tc.txns[id] = txnMeta
tc.heartbeat(id)
}
txnMeta.addKeyRange(header.Key, header.EndKey)
}
// Update our record of this transaction.
if txnMeta != nil {
txnMeta.txn = *txn
txnMeta.setLastUpdate(tc.clock.PhysicalNow())
}
}
tc.Unlock()
}
// Cleanup intents and transaction map if end of transaction.
switch t := call.Reply.Header().GoError().(type) {
case *proto.TransactionStatusError:
// Likely already committed or more obscure errors such as epoch or
// timestamp regressions; consider it dead.
tc.cleanupTxn(t.Txn, nil)
case *proto.TransactionAbortedError:
// If already aborted, cleanup the txn on this TxnCoordSender.
tc.cleanupTxn(t.Txn, nil)
case *proto.OpRequiresTxnError:
// Run a one-off transaction with that single command.
if log.V(1) {
log.Infof("%s: auto-wrapping in txn and re-executing", call.Method())
}
tmpDB, err := client.Open(
fmt.Sprintf("//%s?priority=%d",
call.Args.Header().User, call.Args.Header().GetUserPriority()),
client.SenderOpt(tc))
if err != nil {
log.Warning(err)
return
}
//.........这里部分代码省略.........
开发者ID:Hellblazer,项目名称:cockroach,代码行数:101,代码来源:txn_coord_sender.go
示例11: verifyUncertainty
// verifyUncertainty writes values to a key in 5ns intervals and then launches
// a transaction at each value's timestamp reading that value with
// the maximumOffset given, verifying in the process that the correct values
// are read (usually after one transaction restart).
func verifyUncertainty(concurrency int, maxOffset time.Duration, t *testing.T) {
s := createTestDB(t)
defer s.Stop()
key := []byte("key-test")
// wgStart waits for all transactions to line up, wgEnd has the main
// function wait for them to finish.
var wgStart, wgEnd sync.WaitGroup
wgStart.Add(concurrency + 1)
wgEnd.Add(concurrency)
// Initial high offset to allow for future writes.
s.Clock.SetMaxOffset(999 * time.Nanosecond)
s.Manual.Set(s.Clock.MaxOffset().Nanoseconds() + 1)
for i := 0; i < concurrency; i++ {
value := []byte(fmt.Sprintf("value-%d", i))
// Values will be written with 5ns spacing.
futureTS := s.Clock.Now().Add(5, 0)
s.Clock.Update(futureTS)
// Expected number of versions skipped.
skipCount := int(maxOffset) / 5
if i+skipCount >= concurrency {
skipCount = concurrency - i - 1
}
readValue := []byte(fmt.Sprintf("value-%d", i+skipCount))
if err := s.DB.Put(key, value); err != nil {
t.Errorf("%d: got write error: %s", i, err)
}
if gr, err := s.DB.Get(key); err != nil {
t.Fatalf("%d: expected success reading value: %s", i, err)
} else if !gr.Exists() || !bytes.Equal(gr.ValueBytes(), value) {
t.Fatalf("%d: expected success reading value: %v", i, gr.Value)
}
go func(i int) {
defer wgEnd.Done()
wgStart.Done()
// Wait until the other goroutines are running.
wgStart.Wait()
txnManual := hlc.NewManualClock(futureTS.WallTime)
txnClock := hlc.NewClock(txnManual.UnixNano)
// Make sure to incorporate the logical component if the wall time
// hasn't changed (i=0). The logical component will change
// internally in a way we can't track, but we want to be just
// ahead.
txnClock.Update(futureTS.Add(0, 999))
// The written values are spaced out in intervals of 5ns, so
// setting <5ns here should make do without any restarts while
// higher values require roughly offset/5 restarts.
txnClock.SetMaxOffset(maxOffset)
sender := NewTxnCoordSender(s.lSender, txnClock, false, nil, s.Stopper)
txnDB, err := client.Open("//", client.SenderOpt(sender))
if err != nil {
t.Fatal(err)
}
if err := txnDB.Txn(func(txn *client.Txn) error {
// Read within the transaction.
gr, err := txn.Get(key)
if err != nil {
if _, ok := err.(*proto.ReadWithinUncertaintyIntervalError); ok {
return err
}
return util.Errorf("unexpected read error of type %s: %s", reflect.TypeOf(err), err)
}
if !gr.Exists() {
return util.Errorf("no value read")
}
if !bytes.Equal(gr.ValueBytes(), readValue) {
return util.Errorf("%d: read wrong value %v at %s, wanted %q",
i, gr.Value, futureTS, readValue)
}
return nil
}); err != nil {
t.Error(err)
}
}(i)
}
// Kick the goroutines loose.
wgStart.Done()
// Wait for the goroutines to finish.
wgEnd.Wait()
}
开发者ID:Eric-Gaudiello,项目名称:cockroach,代码行数:89,代码来源:txn_test.go
注:本文中的github.com/cockroachdb/cockroach/client.SenderOpt函数示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论