142 lines
3.4 KiB
Go
142 lines
3.4 KiB
Go
package qgrpc
|
|
|
|
import (
|
|
"code.infininov.com/Infini/unitech-golib/qgrpc/ping"
|
|
"code.infininov.com/Infini/unitech-golib/tlog"
|
|
"context"
|
|
"fmt"
|
|
"github.com/coreos/etcd/clientv3"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/keepalive"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
type Server struct {
|
|
userCfg *ServerConfig
|
|
etcdCli *clientv3.Client
|
|
etcdId clientv3.LeaseID
|
|
}
|
|
|
|
type Client struct {
|
|
userCfg *ClientConfig
|
|
etcdCli *clientv3.Client
|
|
etcdId clientv3.LeaseID
|
|
rpcServers map[string]*grpc.ClientConn
|
|
}
|
|
|
|
type RpcServer struct {
|
|
Name string //服务的名字
|
|
Addrs []string //默认的地址
|
|
}
|
|
|
|
const (
|
|
DFT_DIAL_TIMEOUT = 3
|
|
)
|
|
|
|
func etcdDialWrapper(cfg *BasicConfig) (*clientv3.Client, error) {
|
|
dialTimeout := DFT_DIAL_TIMEOUT
|
|
if cfg.DialTimeout > 0 {
|
|
dialTimeout = cfg.DialTimeout
|
|
}
|
|
return clientv3.New(clientv3.Config{
|
|
Endpoints: cfg.EndPoints,
|
|
DialTimeout: time.Duration(dialTimeout) * time.Second,
|
|
})
|
|
}
|
|
|
|
func ServerRegist(cfg *ServerConfig) (server *Server, err error) {
|
|
server = &Server{userCfg: cfg}
|
|
|
|
if server.etcdCli, err = etcdDialWrapper(&cfg.BasicConfig); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
server.register()
|
|
|
|
return server, nil
|
|
}
|
|
|
|
func NewClient(cfg *ClientConfig) (client *Client, err error) {
|
|
client = &Client{userCfg: cfg}
|
|
|
|
if client.etcdCli, err = etcdDialWrapper(&cfg.BasicConfig); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
client.rpcServers = make(map[string]*grpc.ClientConn)
|
|
|
|
keepAlive := 3
|
|
if cfg.KeepAlive > 0 {
|
|
keepAlive = cfg.KeepAlive
|
|
}
|
|
|
|
for _, watchServer := range cfg.WatchServers {
|
|
rpcServer := ParseServerName(watchServer)
|
|
resolver := newresolver(client, rpcServer.Name, rpcServer.Addrs)
|
|
rrBalancer := grpc.RoundRobin(resolver)
|
|
|
|
client.rpcServers[rpcServer.Name], err = grpc.Dial("", grpc.WithInsecure(),
|
|
grpc.WithTimeout(time.Second),
|
|
grpc.WithKeepaliveParams(keepalive.ClientParameters{Time: time.Duration(keepAlive) * time.Second}),
|
|
grpc.WithBalancer(rrBalancer))
|
|
}
|
|
|
|
go client.ping()
|
|
|
|
return
|
|
}
|
|
|
|
// user.User|127.0.0.1:9011 -> Server{Name: user.User Addrs: []string{127.0.0.1:9011}}
|
|
func ParseServerName(serverName string) *RpcServer {
|
|
ss := strings.Split(serverName, "|")
|
|
s := &RpcServer{Name: ss[0]}
|
|
|
|
//没有配置默认地址
|
|
if len(ss) < 2 {
|
|
return s
|
|
}
|
|
|
|
s.Addrs = strings.Split(ss[1], ",")
|
|
return s
|
|
}
|
|
|
|
// 根据server-name 获得对应的连接池
|
|
func (self *Client) GetConn(serverName string) *grpc.ClientConn {
|
|
return self.rpcServers[serverName]
|
|
}
|
|
|
|
// 从etcd注销
|
|
func (self *Server) Revoke() error {
|
|
_, err := self.etcdCli.Revoke(context.Background(), self.etcdId)
|
|
return err
|
|
}
|
|
|
|
func (self *Client) Call(ctx context.Context, method string, args, reply interface{}, opts ...grpc.CallOption) (err error) {
|
|
ss := strings.Split(method, "/")
|
|
fmt.Println("ss:", ss, len(ss))
|
|
if len(ss) < 3 {
|
|
tlog.Errorf("no grpc client available:%s", method)
|
|
return fmt.Errorf("no grpc client available:%s", method)
|
|
}
|
|
conn, ok := self.rpcServers[ss[1]]
|
|
fmt.Println("conn:", conn)
|
|
if !ok {
|
|
tlog.Errorf("no grpc client available:%s", method)
|
|
return fmt.Errorf("no grpc client available:%s", method)
|
|
}
|
|
err = grpc.Invoke(ctx, method, args, reply, conn, opts...)
|
|
return
|
|
}
|
|
|
|
func (self *Client) ping() {
|
|
for range time.NewTicker(time.Second).C {
|
|
for server, conn := range self.rpcServers {
|
|
req, resp := &ping.NoArgs{}, &ping.NoArgs{}
|
|
if err := grpc.Invoke(context.Background(), strings.Join([]string{server, "Ping"}, "/"), req, resp, conn); err != nil {
|
|
tlog.Errorf("qgrpc ping||server=%s||err=%s", server, err)
|
|
}
|
|
}
|
|
}
|
|
}
|