本文整理汇总了Golang中github.com/cockroachdb/cockroach/util.NewFeed函数的典型用法代码示例。如果您正苦于以下问题:Golang NewFeed函数的具体用法?Golang NewFeed怎么用?Golang NewFeed使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了NewFeed函数的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。
示例1: createTestNode
// createTestNode creates an rpc server using the specified address,
// gossip instance, KV database and a node using the specified slice
// of engines. The server, clock and node are returned. If gossipBS is
// not nil, the gossip bootstrap address is set to gossipBS.
func createTestNode(addr net.Addr, engines []engine.Engine, gossipBS net.Addr, t *testing.T) (
*rpc.Server, *hlc.Clock, *Node, *stop.Stopper) {
ctx := storage.StoreContext{}
stopper := stop.NewStopper()
ctx.Clock = hlc.NewClock(hlc.UnixNano)
nodeRPCContext := rpc.NewContext(nodeTestBaseContext, ctx.Clock, stopper)
ctx.ScanInterval = 10 * time.Hour
rpcServer := rpc.NewServer(addr, nodeRPCContext)
if err := rpcServer.Start(); err != nil {
t.Fatal(err)
}
g := gossip.New(nodeRPCContext, testContext.GossipInterval, testContext.GossipBootstrapResolvers)
if gossipBS != nil {
// Handle possibility of a :0 port specification.
if gossipBS == addr {
gossipBS = rpcServer.Addr()
}
g.SetResolvers([]resolver.Resolver{resolver.NewResolverFromAddress(gossipBS)})
g.Start(rpcServer, stopper)
}
ctx.Gossip = g
sender := kv.NewDistSender(&kv.DistSenderContext{Clock: ctx.Clock}, g)
ctx.DB = client.NewDB(sender)
// TODO(bdarnell): arrange to have the transport closed.
// (or attach LocalRPCTransport.Close to the stopper)
ctx.Transport = multiraft.NewLocalRPCTransport(stopper)
ctx.EventFeed = util.NewFeed(stopper)
node := NewNode(ctx)
return rpcServer, ctx.Clock, node, stopper
}
开发者ID:kangxinrong,项目名称:cockroach,代码行数:35,代码来源:node_test.go
示例2: createTestNode
// createTestNode creates an rpc server using the specified address,
// gossip instance, KV database and a node using the specified slice
// of engines. The server, clock and node are returned. If gossipBS is
// not nil, the gossip bootstrap address is set to gossipBS.
func createTestNode(addr net.Addr, engines []engine.Engine, gossipBS net.Addr, t *testing.T) (
*rpc.Server, net.Addr, *hlc.Clock, *Node, *stop.Stopper) {
ctx := storage.StoreContext{}
stopper := stop.NewStopper()
ctx.Clock = hlc.NewClock(hlc.UnixNano)
nodeRPCContext := rpc.NewContext(nodeTestBaseContext, ctx.Clock, stopper)
ctx.ScanInterval = 10 * time.Hour
rpcServer := rpc.NewServer(nodeRPCContext)
grpcServer := grpc.NewServer()
tlsConfig, err := nodeRPCContext.GetServerTLSConfig()
if err != nil {
t.Fatal(err)
}
ln, err := util.ListenAndServe(stopper, grpcutil.GRPCHandlerFunc(grpcServer, rpcServer), addr, tlsConfig)
if err != nil {
t.Fatal(err)
}
g := gossip.New(nodeRPCContext, testContext.GossipBootstrapResolvers, stopper)
if gossipBS != nil {
// Handle possibility of a :0 port specification.
if gossipBS.Network() == addr.Network() && gossipBS.String() == addr.String() {
gossipBS = ln.Addr()
}
r, err := resolver.NewResolverFromAddress(gossipBS)
if err != nil {
t.Fatalf("bad gossip address %s: %s", gossipBS, err)
}
g.SetResolvers([]resolver.Resolver{r})
g.Start(grpcServer, ln.Addr())
}
ctx.Gossip = g
retryOpts := kv.GetDefaultDistSenderRetryOptions()
retryOpts.Closer = stopper.ShouldDrain()
distSender := kv.NewDistSender(&kv.DistSenderContext{
Clock: ctx.Clock,
RPCContext: nodeRPCContext,
RPCRetryOptions: &retryOpts,
}, g)
tracer := tracing.NewTracer()
sender := kv.NewTxnCoordSender(distSender, ctx.Clock, false, tracer, stopper)
ctx.DB = client.NewDB(sender)
// TODO(bdarnell): arrange to have the transport closed.
// (or attach LocalRPCTransport.Close to the stopper)
ctx.Transport = storage.NewLocalRPCTransport(stopper)
ctx.EventFeed = util.NewFeed(stopper)
ctx.Tracer = tracer
node := NewNode(ctx, metric.NewRegistry(), stopper, nil)
return rpcServer, ln.Addr(), ctx.Clock, node, stopper
}
开发者ID:binlijin,项目名称:cockroach,代码行数:54,代码来源:node_test.go
示例3: TestNodeEventFeedTransactionRestart
// TestNodeEventFeedTransactionRestart verifies that calls which indicate a
// transaction restart are counted as successful.
func TestNodeEventFeedTransactionRestart(t *testing.T) {
defer leaktest.AfterTest(t)
stopper := stop.NewStopper()
feed := util.NewFeed(stopper)
nodeID := proto.NodeID(1)
nodefeed := status.NewNodeEventFeed(nodeID, feed)
ner := nodeEventReader{}
ner.readEvents(feed)
nodefeed.CallComplete(&proto.GetRequest{}, &proto.GetResponse{
ResponseHeader: proto.ResponseHeader{
Error: &proto.Error{
TransactionRestart: proto.TransactionRestart_BACKOFF,
},
},
})
nodefeed.CallComplete(&proto.GetRequest{}, &proto.GetResponse{
ResponseHeader: proto.ResponseHeader{
Error: &proto.Error{
TransactionRestart: proto.TransactionRestart_IMMEDIATE,
},
},
})
nodefeed.CallComplete(&proto.PutRequest{}, &proto.PutResponse{
ResponseHeader: proto.ResponseHeader{
Error: &proto.Error{
TransactionRestart: proto.TransactionRestart_ABORT,
},
},
})
feed.Flush()
stopper.Stop()
exp := []string{
"Get",
"Get",
"failed Put",
}
if !reflect.DeepEqual(exp, ner.perNodeFeeds[nodeID]) {
t.Fatalf("received unexpected events: %s", ner.eventFeedString())
}
}
开发者ID:husttom,项目名称:cockroach,代码行数:47,代码来源:feed_test.go
示例4: TestNodeEventFeedTransactionRestart
// TestNodeEventFeedTransactionRestart verifies that calls which indicate a
// transaction restart are counted as successful.
func TestNodeEventFeedTransactionRestart(t *testing.T) {
defer leaktest.AfterTest(t)
stopper := stop.NewStopper()
feed := util.NewFeed(stopper)
nodeID := roachpb.NodeID(1)
nodefeed := status.NewNodeEventFeed(nodeID, feed)
ner := nodeEventReader{}
ner.readEvents(feed)
d := 5 * time.Second
get := wrap(&roachpb.GetRequest{})
nodefeed.CallComplete(get, d, &roachpb.Error{
TransactionRestart: roachpb.TransactionRestart_BACKOFF})
nodefeed.CallComplete(get, d, &roachpb.Error{
TransactionRestart: roachpb.TransactionRestart_IMMEDIATE})
nodefeed.CallComplete(wrap(&roachpb.PutRequest{}), d, &roachpb.Error{
TransactionRestart: roachpb.TransactionRestart_ABORT})
nodefeed.CallComplete(wrap(&roachpb.PutRequest{}), d, &roachpb.Error{
Detail: &roachpb.ErrorDetail{
WriteIntent: &roachpb.WriteIntentError{
Index: &roachpb.ErrPosition{Index: 0},
},
},
TransactionRestart: roachpb.TransactionRestart_ABORT,
})
feed.Flush()
stopper.Stop()
exp := []string{
"Get",
"Get",
"failed Batch",
"failed Put",
}
if !reflect.DeepEqual(exp, ner.perNodeFeeds[nodeID]) {
t.Fatalf("received unexpected events: %s", ner.eventFeedString())
}
}
开发者ID:kaustubhkurve,项目名称:cockroach,代码行数:44,代码来源:feed_test.go
示例5: createTestNode
// createTestNode creates an rpc server using the specified address,
// gossip instance, KV database and a node using the specified slice
// of engines. The server, clock and node are returned. If gossipBS is
// not nil, the gossip bootstrap address is set to gossipBS.
func createTestNode(addr net.Addr, engines []engine.Engine, gossipBS net.Addr, t *testing.T) (
*rpc.Server, net.Addr, *hlc.Clock, *Node, *stop.Stopper) {
ctx := storage.StoreContext{}
stopper := stop.NewStopper()
ctx.Clock = hlc.NewClock(hlc.UnixNano)
nodeRPCContext := rpc.NewContext(nodeTestBaseContext, ctx.Clock, stopper)
ctx.ScanInterval = 10 * time.Hour
rpcServer := rpc.NewServer(nodeRPCContext)
tlsConfig, err := nodeRPCContext.GetServerTLSConfig()
if err != nil {
t.Fatal(err)
}
ln, err := util.ListenAndServe(stopper, rpcServer, addr, tlsConfig)
if err != nil {
t.Fatal(err)
}
g := gossip.New(nodeRPCContext, testContext.GossipBootstrapResolvers)
if gossipBS != nil {
// Handle possibility of a :0 port specification.
if gossipBS == addr {
gossipBS = ln.Addr()
}
r, err := resolver.NewResolverFromAddress(gossipBS)
if err != nil {
t.Fatalf("bad gossip address %s: %s", gossipBS, err)
}
g.SetResolvers([]resolver.Resolver{r})
g.Start(rpcServer, ln.Addr(), stopper)
}
ctx.Gossip = g
sender := kv.NewDistSender(&kv.DistSenderContext{Clock: ctx.Clock, RPCContext: nodeRPCContext}, g)
ctx.DB = client.NewDB(sender)
// TODO(bdarnell): arrange to have the transport closed.
// (or attach LocalRPCTransport.Close to the stopper)
ctx.Transport = storage.NewLocalRPCTransport(stopper)
ctx.EventFeed = util.NewFeed(stopper)
node := NewNode(ctx, metric.NewRegistry(), stopper)
return rpcServer, ln.Addr(), ctx.Clock, node, stopper
}
开发者ID:ming-hai,项目名称:cockroach,代码行数:44,代码来源:node_test.go
示例6: NewServer
// NewServer creates a Server from a server.Context.
func NewServer(ctx *Context, stopper *stop.Stopper) (*Server, error) {
if ctx == nil {
return nil, util.Error("ctx must not be null")
}
addr := ctx.Addr
_, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, util.Errorf("unable to resolve RPC address %q: %v", addr, err)
}
if ctx.Insecure {
log.Warning("running in insecure mode, this is strongly discouraged. See --insecure and --certs.")
}
// Try loading the TLS configs before anything else.
if _, err := ctx.GetServerTLSConfig(); err != nil {
return nil, err
}
if _, err := ctx.GetClientTLSConfig(); err != nil {
return nil, err
}
s := &Server{
ctx: ctx,
mux: http.NewServeMux(),
clock: hlc.NewClock(hlc.UnixNano),
stopper: stopper,
}
s.clock.SetMaxOffset(ctx.MaxOffset)
rpcContext := rpc.NewContext(&ctx.Context, s.clock, stopper)
stopper.RunWorker(func() {
rpcContext.RemoteClocks.MonitorRemoteOffsets(stopper)
})
s.rpc = rpc.NewServer(util.MakeUnresolvedAddr("tcp", addr), rpcContext)
s.stopper.AddCloser(s.rpc)
s.gossip = gossip.New(rpcContext, s.ctx.GossipInterval, s.ctx.GossipBootstrapResolvers)
feed := util.NewFeed(stopper)
tracer := tracer.NewTracer(feed, addr)
ds := kv.NewDistSender(&kv.DistSenderContext{Clock: s.clock}, s.gossip)
sender := kv.NewTxnCoordSender(ds, s.clock, ctx.Linearizable, tracer, s.stopper)
if s.db, err = client.Open("//[email protected]", client.SenderOpt(sender)); err != nil {
return nil, err
}
s.raftTransport, err = newRPCTransport(s.gossip, s.rpc, rpcContext)
if err != nil {
return nil, err
}
s.stopper.AddCloser(s.raftTransport)
s.kvDB = kv.NewDBServer(&s.ctx.Context, sender)
if s.ctx.ExperimentalRPCServer {
if err = s.kvDB.RegisterRPC(s.rpc); err != nil {
return nil, err
}
}
s.sqlServer = sql.NewServer(&s.ctx.Context, s.db)
// TODO(bdarnell): make StoreConfig configurable.
nCtx := storage.StoreContext{
Clock: s.clock,
DB: s.db,
Gossip: s.gossip,
Transport: s.raftTransport,
ScanInterval: s.ctx.ScanInterval,
ScanMaxIdleTime: s.ctx.ScanMaxIdleTime,
EventFeed: feed,
Tracer: tracer,
}
s.node = NewNode(nCtx)
s.admin = newAdminServer(s.db, s.stopper)
s.status = newStatusServer(s.db, s.gossip, ctx)
s.tsDB = ts.NewDB(s.db)
s.tsServer = ts.NewServer(s.tsDB)
return s, nil
}
开发者ID:nkhuyu,项目名称:cockroach,代码行数:83,代码来源:server.go
示例7: NewServer
// NewServer creates a Server from a server.Context.
func NewServer(ctx *Context, stopper *stop.Stopper) (*Server, error) {
if ctx == nil {
return nil, util.Errorf("ctx must not be null")
}
if _, err := net.ResolveTCPAddr("tcp", ctx.Addr); err != nil {
return nil, util.Errorf("unable to resolve RPC address %q: %v", ctx.Addr, err)
}
if ctx.Insecure {
log.Warning("running in insecure mode, this is strongly discouraged. See --insecure and --certs.")
}
// Try loading the TLS configs before anything else.
if _, err := ctx.GetServerTLSConfig(); err != nil {
return nil, err
}
if _, err := ctx.GetClientTLSConfig(); err != nil {
return nil, err
}
s := &Server{
ctx: ctx,
mux: http.NewServeMux(),
clock: hlc.NewClock(hlc.UnixNano),
stopper: stopper,
}
s.clock.SetMaxOffset(ctx.MaxOffset)
s.rpcContext = crpc.NewContext(&ctx.Context, s.clock, stopper)
stopper.RunWorker(func() {
s.rpcContext.RemoteClocks.MonitorRemoteOffsets(stopper)
})
s.rpc = crpc.NewServer(s.rpcContext)
s.gossip = gossip.New(s.rpcContext, s.ctx.GossipBootstrapResolvers)
s.storePool = storage.NewStorePool(s.gossip, s.clock, ctx.TimeUntilStoreDead, stopper)
feed := util.NewFeed(stopper)
tracer := tracer.NewTracer(feed, ctx.Addr)
ds := kv.NewDistSender(&kv.DistSenderContext{Clock: s.clock, RPCContext: s.rpcContext}, s.gossip)
sender := kv.NewTxnCoordSender(ds, s.clock, ctx.Linearizable, tracer, s.stopper)
s.db = client.NewDB(sender)
var err error
s.raftTransport, err = newRPCTransport(s.gossip, s.rpc, s.rpcContext)
if err != nil {
return nil, err
}
s.stopper.AddCloser(s.raftTransport)
s.kvDB = kv.NewDBServer(&s.ctx.Context, sender)
if err := s.kvDB.RegisterRPC(s.rpc); err != nil {
return nil, err
}
leaseMgr := sql.NewLeaseManager(0, *s.db, s.clock)
leaseMgr.RefreshLeases(s.stopper, s.db, s.gossip)
s.sqlServer = sql.MakeServer(&s.ctx.Context, *s.db, s.gossip, leaseMgr)
if err := s.sqlServer.RegisterRPC(s.rpc); err != nil {
return nil, err
}
s.pgServer = pgwire.NewServer(&pgwire.Context{
Context: &s.ctx.Context,
Executor: s.sqlServer.Executor,
Stopper: stopper,
})
// TODO(bdarnell): make StoreConfig configurable.
nCtx := storage.StoreContext{
Clock: s.clock,
DB: s.db,
Gossip: s.gossip,
Transport: s.raftTransport,
ScanInterval: s.ctx.ScanInterval,
ScanMaxIdleTime: s.ctx.ScanMaxIdleTime,
EventFeed: feed,
Tracer: tracer,
StorePool: s.storePool,
AllocatorOptions: storage.AllocatorOptions{
AllowRebalance: true,
Mode: s.ctx.BalanceMode,
},
}
s.node = NewNode(nCtx)
s.admin = newAdminServer(s.db, s.stopper)
s.status = newStatusServer(s.db, s.gossip, ctx)
s.tsDB = ts.NewDB(s.db)
s.tsServer = ts.NewServer(s.tsDB)
return s, nil
}
开发者ID:gechong,项目名称:cockroach,代码行数:95,代码来源:server.go
示例8: TestStoreEventFeed
//.........这里部分代码省略.........
Delta: engine.MVCCStats{
LiveBytes: 200,
KeyBytes: 30,
ValBytes: 170,
LastUpdateNanos: 20 * 1E9,
},
},
Removed: RemoveRangeEvent{
Desc: &roachpb.RangeDescriptor{
RangeID: 2,
StartKey: roachpb.RKey("b"),
EndKey: roachpb.RKey("c"),
},
Stats: engine.MVCCStats{
LiveBytes: 200,
KeyBytes: 30,
ValBytes: 170,
LastUpdateNanos: 20 * 1E9,
},
},
},
},
{
"StoreStatus",
func(feed StoreEventFeed) {
feed.storeStatus(storeDesc)
},
&StoreStatusEvent{
Desc: storeDesc,
},
},
{
"ReplicationStatus",
func(feed StoreEventFeed) {
feed.replicationStatus(3, 2, 1)
},
&ReplicationStatusEvent{
StoreID: roachpb.StoreID(1),
LeaderRangeCount: 3,
ReplicatedRangeCount: 2,
AvailableRangeCount: 1,
},
},
{
"StartStore",
func(feed StoreEventFeed) {
feed.startStore(100)
},
&StartStoreEvent{
StoreID: roachpb.StoreID(1),
StartedAt: 100,
},
},
{
"BeginScanRanges",
func(feed StoreEventFeed) {
feed.beginScanRanges()
},
&BeginScanRangesEvent{
StoreID: roachpb.StoreID(1),
},
},
{
"EndScanRanges",
func(feed StoreEventFeed) {
feed.endScanRanges()
},
&EndScanRangesEvent{
StoreID: roachpb.StoreID(1),
},
},
}
// Compile expected events into a single slice.
expectedEvents := make([]interface{}, len(testCases))
for i := range testCases {
expectedEvents[i] = testCases[i].expected
}
events := make([]interface{}, 0, len(expectedEvents))
// Run test cases directly through a feed.
stopper := stop.NewStopper()
defer stopper.Stop()
feed := util.NewFeed(stopper)
feed.Subscribe(func(event interface{}) {
events = append(events, event)
})
storefeed := NewStoreEventFeed(roachpb.StoreID(1), feed)
for _, tc := range testCases {
tc.publishTo(storefeed)
}
feed.Flush()
if a, e := events, expectedEvents; !reflect.DeepEqual(a, e) {
t.Errorf("received incorrect events.\nexpected: %v\nactual: %v", e, a)
}
}
开发者ID:mbertschler,项目名称:cockroach,代码行数:101,代码来源:feed_test.go
示例9: NewServer
// NewServer creates a Server from a server.Context.
func NewServer(ctx *Context, stopper *stop.Stopper) (*Server, error) {
if ctx == nil {
return nil, util.Errorf("ctx must not be null")
}
if _, err := net.ResolveTCPAddr("tcp", ctx.Addr); err != nil {
return nil, util.Errorf("unable to resolve RPC address %q: %v", ctx.Addr, err)
}
if ctx.Insecure {
log.Warning("running in insecure mode, this is strongly discouraged. See --insecure and --certs.")
}
// Try loading the TLS configs before anything else.
if _, err := ctx.GetServerTLSConfig(); err != nil {
return nil, err
}
if _, err := ctx.GetClientTLSConfig(); err != nil {
return nil, err
}
s := &Server{
Tracer: tracing.NewTracer(),
ctx: ctx,
mux: http.NewServeMux(),
clock: hlc.NewClock(hlc.UnixNano),
stopper: stopper,
}
s.clock.SetMaxOffset(ctx.MaxOffset)
s.rpcContext = crpc.NewContext(&ctx.Context, s.clock, stopper)
stopper.RunWorker(func() {
s.rpcContext.RemoteClocks.MonitorRemoteOffsets(stopper)
})
s.rpc = crpc.NewServer(s.rpcContext)
s.gossip = gossip.New(s.rpcContext, s.ctx.GossipBootstrapResolvers, stopper)
s.storePool = storage.NewStorePool(s.gossip, s.clock, ctx.TimeUntilStoreDead, stopper)
feed := util.NewFeed(stopper)
// A custom RetryOptions is created which uses stopper.ShouldDrain() as
// the Closer. This prevents infinite retry loops from occurring during
// graceful server shutdown
//
// Such a loop loop occurs with the DistSender attempts a connection to the
// local server during shutdown, and receives an internal server error (HTTP
// Code 5xx). This is the correct error for a server to return when it is
// shutting down, and is normally retryable in a cluster environment.
// However, on a single-node setup (such as a test), retries will never
// succeed because the only server has been shut down; thus, thus the
// DistSender needs to know that it should not retry in this situation.
retryOpts := kv.GetDefaultDistSenderRetryOptions()
retryOpts.Closer = stopper.ShouldDrain()
ds := kv.NewDistSender(&kv.DistSenderContext{
Clock: s.clock,
RPCContext: s.rpcContext,
RPCRetryOptions: &retryOpts,
}, s.gossip)
txnRegistry := metric.NewRegistry()
txnMetrics := kv.NewTxnMetrics(txnRegistry)
sender := kv.NewTxnCoordSender(ds, s.clock, ctx.Linearizable, s.Tracer, s.stopper, txnMetrics)
s.db = client.NewDB(sender)
s.grpc = grpc.NewServer()
s.raftTransport = storage.NewRaftTransport(storage.GossipAddressResolver(s.gossip), s.grpc, s.rpcContext)
s.kvDB = kv.NewDBServer(&s.ctx.Context, sender, stopper)
if err := s.kvDB.RegisterRPC(s.rpc); err != nil {
return nil, err
}
s.leaseMgr = sql.NewLeaseManager(0, *s.db, s.clock)
s.leaseMgr.RefreshLeases(s.stopper, s.db, s.gossip)
sqlRegistry := metric.NewRegistry()
s.sqlExecutor = sql.NewExecutor(*s.db, s.gossip, s.leaseMgr, s.stopper, sqlRegistry)
s.pgServer = pgwire.MakeServer(&s.ctx.Context, s.sqlExecutor, sqlRegistry)
// TODO(bdarnell): make StoreConfig configurable.
nCtx := storage.StoreContext{
Clock: s.clock,
DB: s.db,
Gossip: s.gossip,
Transport: s.raftTransport,
ScanInterval: s.ctx.ScanInterval,
ScanMaxIdleTime: s.ctx.ScanMaxIdleTime,
EventFeed: feed,
Tracer: s.Tracer,
StorePool: s.storePool,
SQLExecutor: sql.InternalExecutor{
LeaseManager: s.leaseMgr,
},
LogRangeEvents: true,
AllocatorOptions: storage.AllocatorOptions{
AllowRebalance: true,
Mode: s.ctx.BalanceMode,
},
}
//.........这里部分代码省略.........
开发者ID:mrtracy,项目名称:cockroach,代码行数:101,代码来源:server.go
示例10: TestNodeEventFeed
func TestNodeEventFeed(t *testing.T) {
defer leaktest.AfterTest(t)
nodeDesc := roachpb.NodeDescriptor{
NodeID: roachpb.NodeID(99),
}
// A testCase corresponds to a single Store event type. Each case contains a
// method which publishes a single event to the given storeEventPublisher,
// and an expected result interface which should match the produced
// event.
testCases := []struct {
publishTo func(status.NodeEventFeed)
expected interface{}
}{
{
publishTo: func(nef status.NodeEventFeed) {
nef.StartNode(nodeDesc, 100)
},
expected: &status.StartNodeEvent{
Desc: nodeDesc,
StartedAt: 100,
},
},
{
publishTo: func(nef status.NodeEventFeed) {
nef.CallComplete(wrap(roachpb.NewGet(roachpb.Key("abc"))), 0, nil)
},
expected: &status.CallSuccessEvent{
NodeID: roachpb.NodeID(1),
Method: roachpb.Get,
},
},
{
publishTo: func(nef status.NodeEventFeed) {
nef.CallComplete(wrap(roachpb.NewPut(roachpb.Key("abc"), roachpb.MakeValueFromString("def"))), 0, nil)
},
expected: &status.CallSuccessEvent{
NodeID: roachpb.NodeID(1),
Method: roachpb.Put,
},
},
{
publishTo: func(nef status.NodeEventFeed) {
nef.CallComplete(wrap(roachpb.NewGet(roachpb.Key("abc"))), 0, roachpb.NewErrorf("error"))
},
expected: &status.CallErrorEvent{
NodeID: roachpb.NodeID(1),
Method: roachpb.Batch,
},
},
{
publishTo: func(nef status.NodeEventFeed) {
nef.CallComplete(wrap(roachpb.NewGet(roachpb.Key("abc"))), time.Minute, &roachpb.Error{
Detail: &roachpb.ErrorDetail{
WriteIntent: &roachpb.WriteIntentError{},
},
Index: &roachpb.ErrPosition{Index: 0},
Message: "boo",
})
},
expected: &status.CallErrorEvent{
NodeID: roachpb.NodeID(1),
Method: roachpb.Get,
Duration: time.Minute,
},
},
}
// Compile expected events into a single slice.
expectedEvents := make([]interface{}, len(testCases))
for i := range testCases {
expectedEvents[i] = testCases[i].expected
}
events := make([]interface{}, 0, len(expectedEvents))
// Run test cases directly through a feed.
stopper := stop.NewStopper()
defer stopper.Stop()
feed := util.NewFeed(stopper)
feed.Subscribe(func(event interface{}) {
events = append(events, event)
})
nodefeed := status.NewNodeEventFeed(roachpb.NodeID(1), feed)
for _, tc := range testCases {
tc.publishTo(nodefeed)
}
feed.Flush()
if a, e := events, expectedEvents; !reflect.DeepEqual(a, e) {
t.Errorf("received incorrect events.\nexpected: %v\nactual: %v", e, a)
}
}
开发者ID:billhongs,项目名称:cockroach,代码行数:96,代码来源:feed_test.go
示例11: TestMultiStoreEventFeed
// TestMultiStoreEventFeed verifies that events on multiple stores are properly
// recieved by a single event reader.
func TestMultiStoreEventFeed(t *testing.T) {
defer leaktest.AfterTest(t)
t.Skip("disabled until #1531 is fixed")
stopper := stop.NewStopper()
// Create a multiTestContext which publishes all store events to the given
// feed.
feed := util.NewFeed(stopper)
mtc := &multiTestContext{
feed: feed,
}
// Start reading events from the feed before starting the stores.
ser := &storeEventReader{
recordUpdateDetail: false,
}
ser.readEvents(feed)
mtc.Start(t, 3)
defer mtc.Stop()
// Replicate the default range.
rangeID := roachpb.RangeID(1)
mtc.replicateRange(rangeID, 0, 1, 2)
// Add some data in a transaction
err := mtc.db.Txn(func(txn *client.Txn) error {
b := &client.Batch{}
b.Put("a", "asdf")
b.Put("c", "jkl;")
return txn.CommitInBatch(b)
})
if err != nil {
t.Fatalf("error putting data to db: %s", err)
}
// AdminSplit in between the two ranges.
if err := mtc.db.AdminSplit("b"); err != nil {
t.Fatalf("error splitting initial: %s", err)
}
// AdminSplit an empty range at the end of the second range.
if err := mtc.db.AdminSplit("z"); err != nil {
t.Fatalf("error splitting second range: %s", err)
}
// AdminMerge the empty range back into the second range.
if err := mtc.db.AdminMerge("c"); err != nil {
t.Fatalf("error merging final range: %s", err)
}
// Add an additional put through the system and wait for all
// replicas to receive it.
if _, err := mtc.db.Inc("aa", 5); err != nil {
t.Fatalf("error putting data to db: %s", err)
}
util.SucceedsWithin(t, time.Second, func() error {
for _, eng := range mtc.engines {
val, _, err := engine.MVCCGet(eng, roachpb.Key("aa"), mtc.clock.Now(), true, nil)
if err != nil {
return err
}
if a, e := mustGetInt(val), int64(5); a != e {
return util.Errorf("expected aa = %d, got %d", e, a)
}
}
return nil
})
// Close feed and wait for reader to receive all events.
feed.Flush()
stopper.Stop()
// Compare events to expected values.
expected := map[roachpb.StoreID][]string{
roachpb.StoreID(1): {
"StartStore",
"BeginScanRanges",
"RegisterRange scan=true, rid=1, live=.*",
"EndScanRanges",
"SplitRange origId=1, newId=2, origKey=336, newKey=15",
"SplitRange origId=2, newId=3, origKey=15, newKey=0",
"MergeRange rid=2, subId=3, key=15, subKey=0",
},
roachpb.StoreID(2): {
"StartStore",
"BeginScanRanges",
"EndScanRanges",
"RegisterRange scan=false, rid=1, live=.*",
"SplitRange origId=1, newId=2, origKey=336, newKey=15",
"SplitRange origId=2, newId=3, origKey=15, newKey=0",
"MergeRange rid=2, subId=3, key=15, subKey=0",
},
roachpb.StoreID(3): {
"StartStore",
"BeginScanRanges",
"EndScanRanges",
//.........这里部分代码省略.........
开发者ID:nporsche,项目名称:cockroach,代码行数:101,代码来源:client_event_test.go
示例12: TestNodeStatusMonitor
func TestNodeStatusMonitor(t *testing.T) {
defer leaktest.AfterTest(t)
desc1 := &roachpb.RangeDescriptor{
RangeID: 1,
StartKey: roachpb.RKey("a"),
EndKey: roachpb.RKey("b"),
}
desc2 := &roachpb.RangeDescriptor{
RangeID: 2,
StartKey: roachpb.RKey("b"),
EndKey: roachpb.RKey("c"),
}
stats := engine.MVCCStats{
LiveBytes: 1,
KeyBytes: 2,
ValBytes: 2,
IntentBytes: 1,
LiveCount: 1,
KeyCount: 1,
ValCount: 1,
IntentCount: 1,
IntentAge: 1,
GCBytesAge: 1,
LastUpdateNanos: 1 * 1E9,
}
stopper := stop.NewStopper()
feed := util.NewFeed(stopper)
monitor := NewNodeStatusMonitor()
monitor.StartMonitorFeed(feed)
for i := 0; i < 3; i++ {
id := roachpb.StoreID(i + 1)
eventList := []interface{}{
&storage.StartStoreEvent{
StoreID: id,
},
&storage.RegisterRangeEvent{
StoreID: id,
Desc: desc1,
Stats: stats,
},
&storage.UpdateRangeEvent{
StoreID: id,
Desc: desc1,
Stats: stats,
Delta: stats,
},
&storage.UpdateRangeEvent{
StoreID: id,
Desc: desc1,
Stats: stats,
Delta: stats,
},
&storage.UpdateRangeEvent{
StoreID: id,
Desc: desc1,
Stats: stats,
Delta: stats,
},
&storage.SplitRangeEvent{
StoreID: id,
Original: storage.UpdateRangeEvent{
StoreID: id,
Desc: desc1,
Stats: stats,
Delta: stats,
},
New: storage.RegisterRangeEvent{
StoreID: id,
Desc: desc2,
Stats: stats,
},
},
&storage.UpdateRangeEvent{
StoreID: id,
Desc: desc2,
Stats: stats,
Delta: stats,
},
&storage.UpdateRangeEvent{
StoreID: id,
Desc: desc2,
Stats: stats,
Delta: stats,
},
&CallSuccessEvent{
NodeID: roachpb.NodeID(1),
Method: roachpb.Get,
},
&CallSuccessEvent{
NodeID: roachpb.NodeID(1),
Method: roachpb.Put,
},
&CallErrorEvent{
NodeID: roachpb.NodeID(1),
Method: roachpb.Scan,
},
}
for _, event := range eventList {
//.........这里部分代码省略.........
开发者ID:gechong,项目名称:cockroach,代码行数:101,代码来源:monitor_test.go
示例13: TestNodeEventFeed
func TestNodeEventFeed(t *testing.T) {
defer leaktest.AfterTest(t)
nodeDesc := proto.NodeDescriptor{
NodeID: proto.NodeID(99),
}
// A testCase corresponds to a single Store event type. Each case contains a
// method which publishes a single event to the given storeEventPublisher,
// and an expected result interface which should match the produced
// event.
testCases := []struct {
name string
publishTo func(status.NodeEventFeed)
expected interface{}
}{
{
name: "Start",
publishTo: func(nef status.NodeEventFeed) {
nef.StartNode(nodeDesc, 100)
},
expected: &status.StartNodeEvent{
Desc: nodeDesc,
StartedAt: 100,
},
},
{
name: "Get",
publishTo: func(nef status.NodeEventFeed) {
call := proto.GetCall(proto.Key("abc"))
nef.CallComplete(call.Args, call.Reply)
},
expected: &status.CallSuccessEvent{
NodeID: proto.NodeID(1),
Method: proto.Get,
},
},
{
name: "Put",
publishTo: func(nef status.NodeEventFeed) {
call := proto.PutCall(proto.Key("abc"), proto.Value{Bytes: []byte("def")})
nef.CallComplete(call.Args, call.Reply)
},
expected: &status.CallSuccessEvent{
NodeID: proto.NodeID(1),
Method: proto.Put,
},
},
{
name: "Get Error",
publishTo: func(nef status.NodeEventFeed) {
call := proto.GetCall(proto.Key("abc"))
call.Reply.Header().SetGoError(util.Errorf("error"))
nef.CallComplete(call.Args, call.Reply)
},
expected: &status.CallErrorEvent{
NodeID: proto.NodeID(1),
Method: proto.Get,
},
},
}
// Compile expected events into a single slice.
expectedEvents := make([]interface{}, len(testCases))
for i := range testCases {
expectedEvents[i] = testCases[i].expected
}
events := make([]interface{}, 0, len(expectedEvents))
// Run test cases directly through a feed.
stopper := stop.NewStopper()
defer stopper.Stop()
feed := util.NewFeed(stopper)
feed.Subscribe(func(event interface{}) {
events = append(events, event)
})
nodefeed := status.NewNodeEventFeed(proto.NodeID(1), feed)
for _, tc := range testCases {
tc.publishTo(nodefeed)
}
feed.Flush()
if a, e := events, expectedEvents; !reflect.DeepEqual(a, e) {
t.Errorf("received incorrect events.\nexpected: %v\nactual: %v", e, a)
}
}
开发者ID:husttom,项目名称:cockroach,代码行数:89,代码来源:feed_test.go
示例14: TestTracer
func TestTracer(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop()
feed := util.NewFeed(stopper)
const origin = ":8081"
expTraces := []Trace{
{ID: "10", Content: []TraceItem{
{
depth: 1,
Name: "A1",
Duration: 30 * time.Millisecond,
},
{
depth: 2,
Timestamp: time.Time{}.Add(10 * time.Millisecond),
Name: "A2",
Duration: 10 * time.Millisecond,
},
{
depth: 3,
Timestamp: time.Time{}.Add(15 * time.Millisecond),
Name: "E3",
},
{
depth: 3,
Timestamp: time.Time{}.Add(17 * time.Millisecond),
Name: "E4",
},
}},
}
feed.Subscribe(func(event interface{}) {
trace := event.(*Trace)
if len(expTraces) == 0 {
t.Fatalf("unexpected extra trace: %s", trace)
}
expTrace := expTraces[0]
expTraces = expTraces[1:]
if !reflect.DeepEqual(expTrace.ID, trace.ID) {
t.Errorf("expected ID %s, got %s", expTrace.ID, trace.ID)
}
tc := trace.Content
for i, v := range tc {
if !strings.Contains(v.Func, "TestTracer") || !strings.Contains(v.File, "tracer_test.go") {
t.Errorf("invalid callsite in trace: %s %s", v.Func, v.File)
}
if v.Origin != origin {
t.Fatalf("unexpected origin %s", v.Origin)
}
tc[i].Func, tc[i].File, tc[i].Line, tc[i].Origin = "", "", 0, ""
}
if !reflect.DeepEqual(expTrace.Content, trace.Content) {
t.Fatalf("unexpected content:\n%+v\nwanted:\n%+v", trace, expTrace)
}
})
tracer := NewTracer(feed, origin)
now := &time.Time{}
add := func(t time.Duration) {
then := now.Add(t)
now = &then
}
tracer.now = func() time.Time {
return *now
}
// TODO(tschottdorf): Test some more.
req1 := traceID(10)
t1 := tracer.NewTrace(req1)
e1a := t1.Epoch("A1")
add(10 * time.Millisecond)
e1b := t1.Epoch("A2")
add(5 * time.Millisecond)
t1.Event("E3")
add(2 * time.Millisecond)
t1.Event("E4")
add(3 * time.Millisecond)
e1b() // 10ms elapsed
add(10 * time.Millisecond)
e1a() // 30ms elapsed
t1.Finalize()
if !util.Panics(e1a) || !util.Panics(e1b) {
t.Fatalf("expected a panic when ending an epoch twice")
}
feed.Flush()
if len(expTraces) > 0 {
t.Fatalf("missing traces:\n%+v", expTraces)
}
}
开发者ID:nkhuyu,项目名称:cockroach,代码行数:99,代码来源:tracer_test.go
注:本文中的github.com/cockroachdb/cockroach/util.NewFeed函数示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论