149 lines
3.8 KiB
Go
149 lines
3.8 KiB
Go
|
|
package qgrpc
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"fmt"
|
||
|
|
"github.com/coreos/etcd/clientv3"
|
||
|
|
"github.com/coreos/etcd/mvcc/mvccpb"
|
||
|
|
"google.golang.org/grpc/naming"
|
||
|
|
"path"
|
||
|
|
"strings"
|
||
|
|
"time"
|
||
|
|
)
|
||
|
|
|
||
|
|
type watcher struct {
|
||
|
|
prefix string
|
||
|
|
addrs []string
|
||
|
|
c *clientv3.Client
|
||
|
|
hasStart bool
|
||
|
|
}
|
||
|
|
|
||
|
|
type resolver struct {
|
||
|
|
client *Client
|
||
|
|
name string
|
||
|
|
addrs []string
|
||
|
|
}
|
||
|
|
|
||
|
|
func newresolver(client *Client, name string, addrs []string) *resolver {
|
||
|
|
return &resolver{client: client, name: name, addrs: addrs}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (self *resolver) Resolve(name string) (naming.Watcher, error) {
|
||
|
|
return self.client.newwatcher(self.name, self.addrs), nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (self *Client) newwatcher(name string, addrs []string) naming.Watcher {
|
||
|
|
prefix := path.Join(self.userCfg.Prefix, name)
|
||
|
|
return &watcher{prefix: prefix, addrs: addrs, c: self.etcdCli}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (w *watcher) Close() {}
|
||
|
|
|
||
|
|
func (w *watcher) Next() ([]*naming.Update, error) {
|
||
|
|
//第一次启动从etcd上获取所有的地址同时加上默认的地址
|
||
|
|
if !w.hasStart {
|
||
|
|
updates := []*naming.Update{}
|
||
|
|
|
||
|
|
//etcd 的连接正常
|
||
|
|
if w.c != nil {
|
||
|
|
resp, err := w.c.Get(context.Background(), w.prefix, clientv3.WithPrefix())
|
||
|
|
if err == nil {
|
||
|
|
addrs := extractAddrs(resp)
|
||
|
|
|
||
|
|
for _, addr := range addrs {
|
||
|
|
updates = append(updates, &naming.Update{
|
||
|
|
Op: naming.Add,
|
||
|
|
Addr: addr})
|
||
|
|
updates = append(updates, &naming.Update{
|
||
|
|
Op: naming.Add,
|
||
|
|
Addr: strings.Replace(addr, fmt.Sprintf("%s:", getLocalIP()), "127.0.0.1:", -1)})
|
||
|
|
updates = append(updates, &naming.Update{
|
||
|
|
Op: naming.Add,
|
||
|
|
Addr: strings.Replace(addr, fmt.Sprintf("%s:", getLocalIP()), "localhost:", -1)})
|
||
|
|
updates = append(updates, &naming.Update{
|
||
|
|
Op: naming.Add,
|
||
|
|
Addr: strings.Replace(addr, fmt.Sprintf("%s:", getLocalIP()), getHostName()+":", -1)})
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
//将默认的地址加进入
|
||
|
|
for _, addr := range w.addrs {
|
||
|
|
updates = append(updates, &naming.Update{
|
||
|
|
Op: naming.Add,
|
||
|
|
Addr: addr})
|
||
|
|
updates = append(updates, &naming.Update{
|
||
|
|
Op: naming.Add,
|
||
|
|
Addr: strings.Replace(addr, fmt.Sprintf("%s:", getLocalIP()), "127.0.0.1:", -1)})
|
||
|
|
updates = append(updates, &naming.Update{
|
||
|
|
Op: naming.Add,
|
||
|
|
Addr: strings.Replace(addr, fmt.Sprintf("%s:", getLocalIP()), "localhost:", -1)})
|
||
|
|
updates = append(updates, &naming.Update{
|
||
|
|
Op: naming.Add,
|
||
|
|
Addr: strings.Replace(addr, fmt.Sprintf("%s:", getLocalIP()), getHostName()+":", -1)})
|
||
|
|
}
|
||
|
|
|
||
|
|
w.hasStart = true
|
||
|
|
return updates, nil
|
||
|
|
|
||
|
|
}
|
||
|
|
|
||
|
|
if w.c == nil {
|
||
|
|
time.Sleep(time.Second)
|
||
|
|
return nil, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
rch := w.c.Watch(context.Background(), w.prefix, clientv3.WithPrefix())
|
||
|
|
for wresp := range rch {
|
||
|
|
for _, ev := range wresp.Events {
|
||
|
|
switch ev.Type {
|
||
|
|
case mvccpb.PUT:
|
||
|
|
_, value := string(ev.Kv.Key), string(ev.Kv.Value)
|
||
|
|
//fmt.Println(time.Now(), "[PUT]", key, value)
|
||
|
|
return []*naming.Update{
|
||
|
|
&naming.Update{
|
||
|
|
Op: naming.Add,
|
||
|
|
Addr: value},
|
||
|
|
&naming.Update{
|
||
|
|
Op: naming.Add,
|
||
|
|
Addr: strings.Replace(value, fmt.Sprintf("%s:", getLocalIP()), "127.0.0.1:", -1)},
|
||
|
|
&naming.Update{
|
||
|
|
Op: naming.Add,
|
||
|
|
Addr: strings.Replace(value, fmt.Sprintf("%s:", getLocalIP()), "localhost:", -1)},
|
||
|
|
&naming.Update{
|
||
|
|
Op: naming.Add,
|
||
|
|
Addr: strings.Replace(value, fmt.Sprintf("%s:", getLocalIP()), getHostName()+":", -1)},
|
||
|
|
}, nil
|
||
|
|
|
||
|
|
case mvccpb.DELETE:
|
||
|
|
_, value := string(ev.Kv.Key), string(ev.Kv.Value)
|
||
|
|
//fmt.Println(time.Now(), "[DELETE]", key)
|
||
|
|
return []*naming.Update{
|
||
|
|
&naming.Update{
|
||
|
|
Op: naming.Delete,
|
||
|
|
Addr: value,
|
||
|
|
},
|
||
|
|
}, nil
|
||
|
|
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
return nil, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func extractAddrs(resp *clientv3.GetResponse) []string {
|
||
|
|
addrs := []string{}
|
||
|
|
|
||
|
|
if resp == nil || resp.Kvs == nil {
|
||
|
|
return addrs
|
||
|
|
}
|
||
|
|
|
||
|
|
for _, v := range resp.Kvs {
|
||
|
|
if v.Value != nil {
|
||
|
|
addrs = append(addrs, string(v.Value))
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return addrs
|
||
|
|
}
|