在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
package naming import ( "encoding/json" etcd "github.com/coreos/etcd/clientv3" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/naming" ) // GRPCResolver creates a grpc.Watcher for a target to track its resolution changes. //GRPCResolver 创建一个 grpc.Watcher对于目标对象,追踪目标对象的改变 type GRPCResolver struct { // Client is an initialized etcd client. Client *etcd.Client } //监听对象更新 func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts ...etcd.OpOption) (err error) { switch nm.Op { case naming.Add: var v []byte if v, err = json.Marshal(nm); err != nil { return grpc.Errorf(codes.InvalidArgument, err.Error()) }
_, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...) case naming.Delete: _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...) default: return grpc.Errorf(codes.InvalidArgument, "naming: bad naming op") }
return err } func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) { ctx, cancel := context.WithCancel(context.Background()) w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel} return w, nil } type gRPCWatcher struct { c *etcd.Client target string ctx context.Context cancel context.CancelFunc wch etcd.WatchChan err error } // Next gets the next set of updates from the etcd resolver. // Calls to Next should be serialized; concurrent calls are not safe since // there is no way to reconcile the update ordering. func (gw *gRPCWatcher) Next() ([]*naming.Update, error) { if gw.wch == nil { // first Next() returns all addresses return gw.firstNext() }
if gw.err != nil { return nil, gw.err }
// process new events on target/* wr, ok := <-gw.wch if !ok { gw.err = grpc.Errorf(codes.Unavailable, "naming: watch closed") return nil, gw.err }
if gw.err = wr.Err(); gw.err != nil { return nil, gw.err }
updates := make([]*naming.Update, 0, len(wr.Events)) for _, e := range wr.Events { var jupdate naming.Update var err error switch e.Type { case etcd.EventTypePut: err = json.Unmarshal(e.Kv.Value, &jupdate) jupdate.Op = naming.Add case etcd.EventTypeDelete: err = json.Unmarshal(e.PrevKv.Value, &jupdate) jupdate.Op = naming.Delete }
if err == nil { updates = append(updates, &jupdate) }
}
return updates, nil } func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) { // Use serialized request so resolution still works if the target etcd // server is partitioned away from the quorum. resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable()) if gw.err = err; err != nil { return nil, err }
updates := make([]*naming.Update, 0, len(resp.Kvs)) for _, kv := range resp.Kvs { var jupdate naming.Update if err := json.Unmarshal(kv.Value, &jupdate); err != nil { continue }
updates = append(updates, &jupdate) }
opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()} gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...) return updates, nil } func (gw *gRPCWatcher) Close() { gw.cancel() } |
请发表评论