• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

go-zero服务注册和发现

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

go-zero 服务注册和发现

在没有服务注册和发现的时候, 没新上一个服务, 或者没部署一个新的节点, 都要改所有调用方的配置文件, 简直就是配置噩梦, 还容易配置错误
分析一个go-zero的服务注册和发现,

接着上面的代码, go-zero实战, 看看rpc客户端怎么寻址到rpc服务端的

//logic调用的代码
regRsp, err := l.svcCtx.UserServiceRpc.Register(l.ctx, in)

//rpc/userserviceclient/userservice.go


func NewUserService(cli zrpc.Client) UserService {
   return &defaultUserService{
   	cli: cli,
   }
}

// 注册
func (m *defaultUserService) Register(ctx context.Context, in *RegisterRequest) (*RegisterResponse, error) {
   //发起调用, 使用的是上面NewUserService里的zrpc.Client
   client := userService.NewUserServiceClient(m.cli.Conn())
   return client.Register(ctx, in)
}

//api/internal/svc/servicecontext.go 中调用的NewUserService
func NewServiceContext(c config.Config) *ServiceContext {
   return &ServiceContext{
   	Config:         c,
   	Model:          model.NewUserinfoModel(sqlx.NewMysql(c.DataSource), c.Cache),
   	UserServiceRpc: userserviceclient.NewUserService(zrpc.MustNewClient(c.Rpc)), //初始化rpcClient
   }
}

//先看一下zrpc.MustNewClient 这个方法, 传入的配置文件中的etcd 的hosts和服务key, 跟进去看下这个方法
//github.com/tal-tech/go-zero/zrpc/client.go
//这个方法没啥, 继续往下面走
func MustNewClient(c RpcClientConf, options ...ClientOption) Client {
   cli, err := NewClient(c, options...)
   if err != nil {
   	log.Fatal(err)
   }

   return cli
}


//方法中主要方法是 internal.NewClient(internal.BuildDiscovTarget(c.Etcd.Hosts, c.Etcd.Key), opts...)
func NewClient(c RpcClientConf, options ...ClientOption) (Client, error) {
   var opts []ClientOption
   if c.HasCredential() {
   	opts = append(opts, WithDialOption(grpc.WithPerRPCCredentials(&auth.Credential{
   		App:   c.App,
   		Token: c.Token,
   	})))
   }
   if c.Timeout > 0 {
   	opts = append(opts, WithTimeout(time.Duration(c.Timeout)*time.Millisecond))
   }
   opts = append(opts, options...)

   var client Client
   var err error
   if len(c.Endpoints) > 0 {
   	client, err = internal.NewClient(internal.BuildDirectTarget(c.Endpoints), opts...)
   } else if err = c.Etcd.Validate(); err == nil {
   	client, err = internal.NewClient(internal.BuildDiscovTarget(c.Etcd.Hosts, c.Etcd.Key), opts...)
   }
   if err != nil {
   	return nil, err
   }

   return &RpcClient{
   	client: client,
   }, nil
}

//先看一下internal.BuildDiscovTarget, 这个方法入参是etcd的hosts和 服务的key, 返回的是一个类似url的东西, 协议是DiscovScheme = "discov"
func BuildDiscovTarget(endpoints []string, key string) string {
   return fmt.Sprintf("%s://%s/%s", resolver.DiscovScheme,
   	strings.Join(endpoints, resolver.EndpointSep), key)
}


//BuildDiscovTarget返回的url类似 : discov://127.0.0.1:2379/user-service, 传入NewClient中
//这个函数有两个核心逻辑 一个是grpc.WithBalancerName(p2c.Name)

// NewClient returns a Client.
func NewClient(target string, opts ...ClientOption) (Client, error) {
   var cli client
   opts = append([]ClientOption{WithDialOption(grpc.WithBalancerName(p2c.Name))}, opts...)
   if err := cli.dial(target, opts...); err != nil {
   	return nil, err
   }

   return &cli, nil
}

//WithBalancerName 这方法看名字知道是负载均衡的作用, 通过balancerName获取的, 对应入参的p2c.Name
func WithBalancerName(balancerName string) DialOption {
   builder := balancer.Get(balancerName)
   if builder == nil {
   	panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
   }
   return newFuncDialOption(func(o *dialOptions) {
   	o.balancerBuilder = builder
   })
}

//跟着p2c.Name进去, github.com/tal-tech/[email protected]/zrpc/internal/balancer/p2c/p2c.go
//在这里注入的负载均衡, 核心的逻辑在Pick中, 大致是一个可选就选一个, 两个就选择连接数最小的, 两个以上就随机两个出来进行选择
//如何选择的逻辑在choose(c1, c2 *subConn)方法中, 基本上就两个选连接数小的那个, 
func init() {
   balancer.Register(newBuilder())
}

type p2cPickerBuilder struct{}

func newBuilder() balancer.Builder {
   return base.NewBalancerBuilder(Name, new(p2cPickerBuilder))
}

//....
func (p *p2cPicker) Pick(ctx context.Context, info balancer.PickInfo) (
   conn balancer.SubConn, done func(balancer.DoneInfo), err error) {
   p.lock.Lock()
   defer p.lock.Unlock()

   var chosen *subConn
   switch len(p.conns) {
   case 0:
   	return nil, nil, balancer.ErrNoSubConnAvailable
   case 1:
   	chosen = p.choose(p.conns[0], nil)
   case 2:
   	chosen = p.choose(p.conns[0], p.conns[1])
   default:
   	var node1, node2 *subConn
   	for i := 0; i < pickTimes; i++ {
   		a := p.r.Intn(len(p.conns))
   		b := p.r.Intn(len(p.conns) - 1)
   		if b >= a {
   			b++
   		}
   		node1 = p.conns[a]
   		node2 = p.conns[b]
   		if node1.healthy() && node2.healthy() {
   			break
   		}
   	}

   	chosen = p.choose(node1, node2)
   }

   atomic.AddInt64(&chosen.inflight, 1)
   atomic.AddInt64(&chosen.requests, 1)
   return chosen.conn, p.buildDoneFunc(chosen), nil
}

//继续返回去看NewClient方法中的dial方法, 传入的是target, 也就是那个url, discov://127.0.0.1:2379/user-service
   if err := cli.dial(target, opts...); err != nil {
   	return nil, err
   }


//github.com/tal-tech/[email protected]/zrpc/internal/client.go
//这个方法本身没什么, 主要就是调用grpc.DialContext()方法, 这里就进入了grpc的逻辑了, 相当于通过grpc dial 了discov://127.0.0.1:2379/user-service, 继续进去看
func (c *client) dial(server string, opts ...ClientOption) error {
   options := c.buildDialOptions(opts...)
   timeCtx, cancel := context.WithTimeout(context.Background(), dialTimeout)
   defer cancel()
   conn, err := grpc.DialContext(timeCtx, server, options...)
   if err != nil {
   	service := server
   	if errors.Is(err, context.DeadlineExceeded) {
   		pos := strings.LastIndexByte(server, separator)
   		// len(server) - 1 is the index of last char
   		if 0 < pos && pos < len(server)-1 {
   			service = server[pos+1:]
   		}
   	}
   	return fmt.Errorf("rpc dial: %s, error: %s, make sure rpc service %q is alread started",
   		server, err.Error(), service)
   }

   c.conn = conn
   return nil
}

//google.golang.org/[email protected]/clientconn.go, 这个代码逻辑比较多, 我们找到我们关心的部分(服务发现), 就是如何解析discov://127.0.0.1:2379/user-service 成为一个ip:port, 通过分析发现cc.parsedTarget.Scheme, 也就是一开始拼接的discov字符串, 跟这getResolver方法进去
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
   cc := &ClientConn{
   	target:            target,
   	csMgr:             &connectivityStateManager{},
   	conns:             make(map[*addrConn]struct{}),
   	dopts:             defaultDialOptions(),
   	blockingpicker:    newPickerWrapper(),
   	czData:            new(channelzData),
   	firstResolveEvent: grpcsync.NewEvent(),
   }
   //.....
   //发现是通过Scheme去获取的, 也就是discov, 
   resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
   
   //....
   return cc, nil
}

//google.golang.org/[email protected]/resolver/resolver.go
//发现get是用map中读的, map数据是Register方法注入的, 返回到DiscovScheme = "discov"定义的地方, 看看有没有调用Register方法
var (
   // m is a map from scheme to resolver builder.
   m = make(map[string]Builder)
   // defaultScheme is the default scheme to use.
   defaultScheme = "passthrough"
)
// registered with the same name, the one registered last will take effect.
func Register(b Builder) {
   m[b.Scheme()] = b
}

// Get returns the resolver builder registered with the given scheme.
//
// If no builder is register with the scheme, nil will be returned.
func Get(scheme string) Builder {
   if b, ok := m[scheme]; ok {
   	return b
   }
   return nil
}


//github.com/tal-tech/[email protected]/zrpc/internal/resolver/resolver.go
//注入的是discovBuilder, 继续看下discovBuilder的具体实现
func RegisterResolver() {
   resolver.Register(&dirBuilder)
   resolver.Register(&disBuilder)
}

//github.com/tal-tech/[email protected]/zrpc/internal/resolver/discovbuilder.go
//具体逻辑看函数中的知识

func (d *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
   resolver.Resolver, error) {
   hosts := strings.FieldsFunc(target.Authority, func(r rune) bool {
   	return r == EndpointSepChar
   })
   //new一个服务发现的客户端, 里面基本上就是个etcd的封装, etcd的逻辑在NewSubscriber里面, 比较简单, 就写出来了
   sub, err := discov.NewSubscriber(hosts, target.Endpoint)
   if err != nil {
   	return nil, err
   }
   //拿到服务key的所有etcd中的数据
   update := func() {
   	var addrs []resolver.Address
   	for _, val := range subset(sub.Values(), subsetSize) {
   		addrs = append(addrs, resolver.Address{
   			Addr: val,
   		})
   	}
   	cc.UpdateState(resolver.State{
   		Addresses: addrs,
   	})
   }
   //实时监听etcd数据变化, 然后通过update方法更新数据到grpc的client
   sub.AddListener(update)
   //初始化的时候调用一次
   update()

   return &nopResolver{cc: cc}, nil
}

func (d *discovBuilder) Scheme() string {
   return DiscovScheme
}
//到这里客户端的服务发现就结束了

//服务注册的代码在rpc/userservice.go中, MustNewServer调用NewServer
func MustNewServer(c RpcServerConf, register internal.RegisterFn) *RpcServer {
   server, err := NewServer(c, register)
   if err != nil {
   	log.Fatal(err)
   }

   return server
}

//NewServer调用的是server, err = internal.NewRpcPubServer(c.Etcd.Hosts, c.Etcd.Key, c.ListenOn, internal.WithMetrics(metrics))
//这个方法进去就是调用discov.NewPublishe
   	pubClient := discov.NewPublisher(etcdEndpoints, etcdKey, pubListenOn)
//继续跟进去, 发现在KeepAlive()中会调用register方法, 用etcd的put方法注册到etcd中(client.Put)
func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) {
   resp, err := client.Grant(client.Ctx(), TimeToLive)
   if err != nil {
   	return clientv3.NoLease, err
   }

   lease := resp.ID
   if p.id > 0 {
   	p.fullKey = makeEtcdKey(p.key, p.id)
   } else {
   	p.fullKey = makeEtcdKey(p.key, int64(lease))
   }
   _, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))

   return lease, err
}

总结

  1. go-zero的注册发现代码比较容易懂, 比较简单, 可以作为初步阅读源码的练手项目
  2. 业务上基本上是够用的

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
go语言模仿Java8的StreamAPIstream发布时间:2022-07-10
下一篇:
go语言的安装、环境变量配置及简单使用发布时间:2022-07-10
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap