Files
unitech-golib/gfetcd/server.go

143 lines
3.5 KiB
Go
Raw Normal View History

2020-04-06 19:57:19 +08:00
package gfetcd
import (
"context"
"fmt"
"net"
"strings"
"time"
2020-04-18 23:21:25 +08:00
"gitlab.com/unitechdev/golib/tlog"
2020-04-06 19:57:19 +08:00
"github.com/coreos/etcd/clientv3"
etcdnaming "github.com/coreos/etcd/clientv3/naming"
"github.com/siddontang/go/log"
"google.golang.org/grpc/naming"
)
// 服务端配置
type NameConfig struct {
BasicConfig
Servers []string `toml:"servers"` // 注册的服务名称,可以是多个
Addr string `toml:"addr"` // 服务地址
TTL int64 `toml:"ttl"` // lease, default 5
UseNew bool `toml:"use_new"` // 使用新版地址格式
}
func (self *NameConfig) GetAddr() string {
if strings.HasPrefix(self.Addr, ":") {
return fmt.Sprintf("%s%s", getLocalIP(), self.Addr)
}
return self.Addr
}
//获取本机的内网Ip, 如果发现对方的ip 和自己的ip 相同用127.0.0.1 替代
func getLocalIP() string {
ifaces, _ := net.Interfaces()
for _, i := range ifaces {
addrs, _ := i.Addrs()
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
ipAddr := ip.String()
if strings.HasPrefix(ipAddr, "172.") || strings.HasPrefix(ipAddr, "192.") || strings.HasPrefix(ipAddr, "10.") {
return ipAddr
}
}
}
return "127.0.0.1"
}
type Name struct {
userCfg *NameConfig
etcdCli *clientv3.Client
stopSignal chan bool
}
func NameRegist(cfg *NameConfig) (name *Name, err error) {
cli, err := etcdDial(&cfg.BasicConfig)
if err != nil {
tlog.Errorf("ETCD NAME||endpoints: %#v||err: %s", cfg.EndPoints, err)
return nil, err
}
name = &Name{
userCfg: cfg,
etcdCli: cli,
stopSignal: make(chan bool, 1),
}
if cfg.TTL == 0 {
cfg.TTL = DEFAULT_LEASE_TTL
}
go func() {
ticker := time.NewTicker(time.Second * time.Duration(cfg.TTL/2))
serviceAddr := cfg.GetAddr()
resolver := &etcdnaming.GRPCResolver{Client: cli}
for {
lease, err := cli.Grant(context.Background(), cfg.TTL)
if err != nil {
tlog.Errorf("ETCD NAME||endpoints: %#v||grant err: %s",
cfg.EndPoints, err)
continue
}
for _, serviceName := range cfg.Servers {
serviceKey := ServiceKey("/new", cfg.Prefix, serviceName)
if err = resolver.Update(context.Background(), serviceKey,
naming.Update{Op: naming.Add, Addr: serviceAddr},
clientv3.WithLease(lease.ID)); err != nil {
tlog.Errorf("ETCD NAME||endpoints: %#v service: %s addr: %s||update err: %s",
cfg.EndPoints, serviceKey, serviceAddr, err)
}
if !cfg.UseNew {
serviceKey = ServiceKey(cfg.Prefix, serviceName, serviceAddr)
if _, err = cli.Put(context.Background(), serviceKey, serviceAddr, clientv3.WithLease(lease.ID)); err != nil {
tlog.Errorf("ETCD NAME||endpoints: %#v put key: %s value: %s||err: %s",
cfg.EndPoints, serviceKey, serviceAddr, err)
}
}
}
select {
case <-name.stopSignal:
tlog.Info("ETCD NAME||received stop signal")
return
case <-ticker.C:
}
}
}()
return name, nil
}
func (self *Name) Unregister() {
self.stopSignal <- true
//self.stopSignal = make(chan bool, 1) // just a hack to avoid multi unregister deadlock
serviceAddr := self.userCfg.GetAddr()
for _, serviceName := range self.userCfg.Servers {
serviceKey := ServiceKey(self.userCfg.Prefix, serviceName, serviceAddr)
if _, err := self.etcdCli.Delete(context.Background(), serviceKey); err != nil {
tlog.Errorf("ETCD NAME||unregister key: %s||err: %s", serviceKey, err)
}
}
log.Info("ETCD SERVER||unregister done")
}