Files
unitech-golib/qgrpc/op.go

142 lines
3.4 KiB
Go
Raw Normal View History

2020-04-06 19:57:19 +08:00
package qgrpc
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
2020-04-16 00:49:26 +08:00
"unitech/golib/qgrpc/ping"
"unitech/golib/tlog"
2020-04-06 19:57:19 +08:00
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"strings"
"time"
)
type Server struct {
userCfg *ServerConfig
etcdCli *clientv3.Client
etcdId clientv3.LeaseID
}
type Client struct {
userCfg *ClientConfig
etcdCli *clientv3.Client
etcdId clientv3.LeaseID
rpcServers map[string]*grpc.ClientConn
}
type RpcServer struct {
Name string //服务的名字
Addrs []string //默认的地址
}
const (
DFT_DIAL_TIMEOUT = 3
)
func etcdDialWrapper(cfg *BasicConfig) (*clientv3.Client, error) {
dialTimeout := DFT_DIAL_TIMEOUT
if cfg.DialTimeout > 0 {
dialTimeout = cfg.DialTimeout
}
return clientv3.New(clientv3.Config{
Endpoints: cfg.EndPoints,
DialTimeout: time.Duration(dialTimeout) * time.Second,
})
}
func ServerRegist(cfg *ServerConfig) (server *Server, err error) {
server = &Server{userCfg: cfg}
if server.etcdCli, err = etcdDialWrapper(&cfg.BasicConfig); err != nil {
return nil, err
}
server.register()
return server, nil
}
func NewClient(cfg *ClientConfig) (client *Client, err error) {
client = &Client{userCfg: cfg}
if client.etcdCli, err = etcdDialWrapper(&cfg.BasicConfig); err != nil {
return nil, err
}
client.rpcServers = make(map[string]*grpc.ClientConn)
keepAlive := 3
if cfg.KeepAlive > 0 {
keepAlive = cfg.KeepAlive
}
for _, watchServer := range cfg.WatchServers {
rpcServer := ParseServerName(watchServer)
resolver := newresolver(client, rpcServer.Name, rpcServer.Addrs)
rrBalancer := grpc.RoundRobin(resolver)
client.rpcServers[rpcServer.Name], err = grpc.Dial("", grpc.WithInsecure(),
grpc.WithTimeout(time.Second),
grpc.WithKeepaliveParams(keepalive.ClientParameters{Time: time.Duration(keepAlive) * time.Second}),
grpc.WithBalancer(rrBalancer))
}
go client.ping()
return
}
//user.User|127.0.0.1:9011 -> Server{Name: user.User Addrs: []string{127.0.0.1:9011}}
func ParseServerName(serverName string) *RpcServer {
ss := strings.Split(serverName, "|")
s := &RpcServer{Name: ss[0]}
//没有配置默认地址
if len(ss) < 2 {
return s
}
s.Addrs = strings.Split(ss[1], ",")
return s
}
//根据server-name 获得对应的连接池
func (self *Client) GetConn(serverName string) *grpc.ClientConn {
return self.rpcServers[serverName]
}
//从etcd注销
func (self *Server) Revoke() error {
_, err := self.etcdCli.Revoke(context.Background(), self.etcdId)
return err
}
func (self *Client) Call(ctx context.Context, method string, args, reply interface{}, opts ...grpc.CallOption) (err error) {
ss := strings.Split(method, "/")
fmt.Println("ss:", ss, len(ss))
if len(ss) < 3 {
tlog.Errorf("no grpc client available:%s", method)
return fmt.Errorf("no grpc client available:%s", method)
}
conn, ok := self.rpcServers[ss[1]]
fmt.Println("conn:", conn)
if !ok {
tlog.Errorf("no grpc client available:%s", method)
return fmt.Errorf("no grpc client available:%s", method)
}
err = grpc.Invoke(ctx, method, args, reply, conn, opts...)
return
}
func (self *Client) ping() {
for range time.NewTicker(time.Second).C {
for server, conn := range self.rpcServers {
req, resp := &ping.NoArgs{}, &ping.NoArgs{}
if err := grpc.Invoke(context.Background(), strings.Join([]string{server, "Ping"}, "/"), req, resp, conn); err != nil {
tlog.Errorf("qgrpc ping||server=%s||err=%s", server, err)
}
}
}
}