package qgrpc import ( "context" "fmt" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/mvcc/mvccpb" "google.golang.org/grpc/naming" "path" "strings" "time" ) type watcher struct { prefix string addrs []string c *clientv3.Client hasStart bool } type resolver struct { client *Client name string addrs []string } func newresolver(client *Client, name string, addrs []string) *resolver { return &resolver{client: client, name: name, addrs: addrs} } func (self *resolver) Resolve(name string) (naming.Watcher, error) { return self.client.newwatcher(self.name, self.addrs), nil } func (self *Client) newwatcher(name string, addrs []string) naming.Watcher { prefix := path.Join(self.userCfg.Prefix, name) return &watcher{prefix: prefix, addrs: addrs, c: self.etcdCli} } func (w *watcher) Close() {} func (w *watcher) Next() ([]*naming.Update, error) { //第一次启动从etcd上获取所有的地址同时加上默认的地址 if !w.hasStart { updates := []*naming.Update{} //etcd 的连接正常 if w.c != nil { resp, err := w.c.Get(context.Background(), w.prefix, clientv3.WithPrefix()) if err == nil { addrs := extractAddrs(resp) for _, addr := range addrs { updates = append(updates, &naming.Update{ Op: naming.Add, Addr: addr}) updates = append(updates, &naming.Update{ Op: naming.Add, Addr: strings.Replace(addr, fmt.Sprintf("%s:", getLocalIP()), "127.0.0.1:", -1)}) updates = append(updates, &naming.Update{ Op: naming.Add, Addr: strings.Replace(addr, fmt.Sprintf("%s:", getLocalIP()), "localhost:", -1)}) updates = append(updates, &naming.Update{ Op: naming.Add, Addr: strings.Replace(addr, fmt.Sprintf("%s:", getLocalIP()), getHostName()+":", -1)}) } } } //将默认的地址加进入 for _, addr := range w.addrs { updates = append(updates, &naming.Update{ Op: naming.Add, Addr: addr}) updates = append(updates, &naming.Update{ Op: naming.Add, Addr: strings.Replace(addr, fmt.Sprintf("%s:", getLocalIP()), "127.0.0.1:", -1)}) updates = append(updates, &naming.Update{ Op: naming.Add, Addr: strings.Replace(addr, fmt.Sprintf("%s:", getLocalIP()), "localhost:", -1)}) updates = append(updates, &naming.Update{ Op: naming.Add, Addr: strings.Replace(addr, fmt.Sprintf("%s:", getLocalIP()), getHostName()+":", -1)}) } w.hasStart = true return updates, nil } if w.c == nil { time.Sleep(time.Second) return nil, nil } rch := w.c.Watch(context.Background(), w.prefix, clientv3.WithPrefix()) for wresp := range rch { for _, ev := range wresp.Events { switch ev.Type { case mvccpb.PUT: _, value := string(ev.Kv.Key), string(ev.Kv.Value) //fmt.Println(time.Now(), "[PUT]", key, value) return []*naming.Update{ &naming.Update{ Op: naming.Add, Addr: value}, &naming.Update{ Op: naming.Add, Addr: strings.Replace(value, fmt.Sprintf("%s:", getLocalIP()), "127.0.0.1:", -1)}, &naming.Update{ Op: naming.Add, Addr: strings.Replace(value, fmt.Sprintf("%s:", getLocalIP()), "localhost:", -1)}, &naming.Update{ Op: naming.Add, Addr: strings.Replace(value, fmt.Sprintf("%s:", getLocalIP()), getHostName()+":", -1)}, }, nil case mvccpb.DELETE: _, value := string(ev.Kv.Key), string(ev.Kv.Value) //fmt.Println(time.Now(), "[DELETE]", key) return []*naming.Update{ &naming.Update{ Op: naming.Delete, Addr: value, }, }, nil } } } return nil, nil } func extractAddrs(resp *clientv3.GetResponse) []string { addrs := []string{} if resp == nil || resp.Kvs == nil { return addrs } for _, v := range resp.Kvs { if v.Value != nil { addrs = append(addrs, string(v.Value)) } } return addrs }