在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
了解grpc/protobufgRPC是一个高性能、通用的开源RPC框架,其由Google主要面向移动应用开发并基于HTTP/2协议标准而设计,基于ProtoBuf(Protocol Buffers)序列化协议开发,且支持众多开发语言。 Protobuf(Protocol Buffers),是 Google 开发的一种跨语言、跨平台的可扩展机制,用于序列化结构化数据。 官网地址:https://developers.google.cn/protocol-buffers/ Grpc中文文档:http://doc.oschina.net/grpc?t=60133
优点:
缺点:
环境配置第一步:安装protobuf:
第二步:安装grpc:
第三步:安装
注意:上面这个文件也需要移到src目录下。 调用过程1、客户端(gRPC Stub)调用 A 方法,发起 RPC 调用。 2、对请求信息使用 Protobuf 进行对象序列化压缩(IDL)。 3、服务端(gRPC Server)接收到请求后,解码请求体,进行业务逻辑处理并返回。 4、对响应结果使用 Protobuf 进行对象序列化压缩(IDL)。 5、客户端接受到服务端响应,解码请求体。回调被调用的 A 方法,唤醒正在等待响应(阻塞)的客户端调用并返回响应结果。 调用方式一、Unary RPC:一元 RPCServertype SearchService struct{} func (s *SearchService) Search(ctx context.Context, r *pb.SearchRequest) (*pb.SearchResponse, error) { return &pb.SearchResponse{Response: r.GetRequest() + " Server"}, nil } const PORT = "9001" func main() { server := grpc.NewServer() pb.RegisterSearchServiceServer(server, &SearchService{}) lis, err := net.Listen("tcp", ":"+PORT) ... server.Serve(lis) }
Clientfunc main() { conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure()) ... defer conn.Close() client := pb.NewSearchServiceClient(conn) resp, err := client.Search(context.Background(), &pb.SearchRequest{ Request: "gRPC", }) ... }
二、Server-side streaming RPC:服务端流式 RPCServerfunc (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error { for n := 0; n <= 6; n++ { stream.Send(&pb.StreamResponse{ Pt: &pb.StreamPoint{ ... }, }) } return nil } Clientfunc printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error { stream, err := client.List(context.Background(), r) ... for { resp, err := stream.Recv() if err == io.EOF { break } ... } return nil } 三、Client-side streaming RPC:客户端流式 RPCServerfunc (s *StreamService) Record(stream pb.StreamService_RecordServer) error { for { r, err := stream.Recv() if err == io.EOF { return stream.SendAndClose(&pb.StreamResponse{Pt: &pb.StreamPoint{...}}) } ... } return nil } Clientfunc printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error { stream, err := client.Record(context.Background()) ... for n := 0; n < 6; n++ { stream.Send(r) } resp, err := stream.CloseAndRecv() ... return nil } 四、Bidirectional streaming RPC:双向流式 RPCServerfunc (s *StreamService) Route(stream pb.StreamService_RouteServer) error { for { stream.Send(&pb.StreamResponse{...}) r, err := stream.Recv() if err == io.EOF { return nil } ... } return nil } Clientfunc printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error { stream, err := client.Route(context.Background()) ... for n := 0; n <= 6; n++ { stream.Send(r) resp, err := stream.Recv() if err == io.EOF { break } ... } stream.CloseSend() return nil } 浅谈理解
服务端为什么四行代码,就能够起一个 gRPC Server,内部做了什么逻辑。你有想过吗?接下来我们一步步剖析,看看里面到底是何方神圣。 一、初始化// grpc.NewServer() func NewServer(opt ...ServerOption) *Server { opts := defaultServerOptions for _, o := range opt { o(&opts) } s := &Server{ lis: make(map[net.Listener]bool), opts: opts, conns: make(map[io.Closer]bool), m: make(map[string]*service), quit: make(chan struct{}), done: make(chan struct{}), czData: new(channelzData), } s.cv = sync.NewCond(&s.mu) ... return s } 这块比较简单,主要是实例 grpc.Server 并进行初始化动作。涉及如下:
二、注册pb.RegisterSearchServiceServer(server, &SearchService{}) 步骤一:Service API interface// search.pb.go type SearchServiceServer interface { Search(context.Context, *SearchRequest) (*SearchResponse, error) } func RegisterSearchServiceServer(s *grpc.Server, srv SearchServiceServer) { s.RegisterService(&_SearchService_serviceDesc, srv) } 还记得我们平时编写的 Protobuf 吗?在生成出来的 步骤二:Service API IDL你想乱传糊弄一下?不可能的,请乖乖定义与 Protobuf 一致的接口方法。但是那个 // search.pb.go var _SearchService_serviceDesc = grpc.ServiceDesc{ ServiceName: "proto.SearchService", HandlerType: (*SearchServiceServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "Search", Handler: _SearchService_Search_Handler, }, }, Streams: []grpc.StreamDesc{}, Metadata: "search.proto", } 这看上去像服务的描述代码,用来向内部表述 “我” 都有什么。涉及如下:
步骤三:Register Servicefunc (s *Server) register(sd *ServiceDesc, ss interface{}) { ... srv := &service{ server: ss, md: make(map[string]*MethodDesc), sd: make(map[string]*StreamDesc), mdata: sd.Metadata, } for i := range sd.Methods { d := &sd.Methods[i] srv.md[d.MethodName] = d } for i := range sd.Streams { ... } s.m[sd.ServiceName] = srv } 在最后一步中,我们会将先前的服务接口信息、服务描述信息给注册到内部
小结在这一章节中,主要介绍的是 gRPC Server 在启动前的整理和注册行为,看上去很简单,但其实一切都是为了后续的实际运行的预先准备。因此我们整理一下思路,将其串联起来看看,如下: 三、监听接下来到了整个流程中,最重要也是大家最关注的监听/处理阶段,核心代码如下: func (s *Server) Serve(lis net.Listener) error { ... var tempDelay time.Duration for { rawConn, err := lis.Accept() if err != nil { if ne, ok := err.(interface { Temporary() bool }); ok && ne.Temporary() { if tempDelay == 0 { tempDelay = 5 * time.Millisecond } else { tempDelay *= 2 } if max := 1 * time.Second; tempDelay > max { tempDelay = max } ... timer := time.NewTimer(tempDelay) select { case <-timer.C: case <-s.quit: timer.Stop() return nil } continue } ... return err } tempDelay = 0 s.serveWG.Add(1) go func() { s.handleRawConn(rawConn) s.serveWG.Done() }() } } Serve 会根据外部传入的 Listener 不同而调用不同的监听模式,这也是
客户端一、创建拨号连接// grpc.Dial(":"+PORT, grpc.WithInsecure()) func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { cc := &ClientConn{ target: target, csMgr: &connectivityStateManager{}, conns: make(map[*addrConn]struct{}), dopts: defaultDialOptions(), blockingpicker: newPickerWrapper(), czData: new(channelzData), firstResolveEvent: grpcsync.NewEvent(), } ... chainUnaryClientInterceptors(cc) chainStreamClientInterceptors(cc) ... }
连没连之前听到有的人说调用
我们可以有几个核心方法一直在等待/处理信号,通过分析底层源码可得知。涉及如下:
在这里主要分析 goroutine 提示的 func (ac *addrConn) resetTransport() { for i := 0; ; i++ { if ac.state == connectivity.Shutdown { return } ... connectDeadline := time.Now().Add(dialDuration) ac.updateConnectivityState(connectivity.Connecting) newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline) if err != nil { if ac.state == connectivity.Shutdown { return } ac.updateConnectivityState(connectivity.TransientFailure) timer := time.NewTimer(backoffFor) select { case <-timer.C: ... } continue } if ac.state == connectivity.Shutdown { newTr.Close() return } ... if !healthcheckManagingState { ac.updateConnectivityState(connectivity.Ready) } ... if ac.state == connectivity.Shutdown { return } ac.updateConnectivityState(connectivity.TransientFailure) } } 在该方法中会不断地去尝试创建连接,若成功则结束。否则不断地根据 二、实例化 Service APItype SearchServiceClient interface { Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*SearchResponse, error) } type searchServiceClient struct { cc *grpc.ClientConn } func NewSearchServiceClient(cc *grpc.ClientConn) SearchServiceClient { return &searchServiceClient{cc} } 这块就是实例 Service API interface,比较简单。 三、调用// search.pb.go func (c *searchServiceClient) Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*SearchResponse, error) { out := new(SearchResponse) err := c.cc.Invoke(ctx, "/proto.SearchService/Search", in, out, opts...) if err != nil { return nil, err } return out, nil } proto 生成的 RPC 方法更像是一个包装盒,把需要的东西放进去,而实际上调用的还是 func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...) if err != nil { return err } if err := cs.SendMsg(req); err != nil { return err } return cs.RecvMsg(reply) } 通过概览,可以关注到三块调用。如下:
连接// clientconn.go func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) { t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{ FullMethodName: method, }) if err != nil { return nil, nil, toRPCErr(err) } return t, done, nil } 在 四、关闭连接// conn.Close() func (cc *ClientConn) Close() error { defer cc.cancel() ... cc.csMgr.updateState(connectivity.Shutdown) ... cc.blockingpicker.close() if rWrapper != nil { rWrapper.close() } if bWrapper != nil { bWrapper.close() } for ac := range conns { ac.tearDown(ErrClientConnClosing) } if channelz.IsOn() { ... channelz.AddTraceEvent(cc.channelzID, ted) channelz.RemoveEntry(cc.channelzID) } return nil } 该方法会取消 ClientConn 上下文,同时关闭所有底层传输。涉及如下:
总结
参考
可以拷贝的代码见: https://github.com/EDDYCJY/blog/blob/master/golang/gRPC/2019-06-28-talking-grpc.md 原文链接:https://mp.weixin.qq.com/s/qet7FX26HGnXgLIG-lOSyw |
请发表评论