grpc: move client conn pool to grpc package

This commit is contained in:
Daniel Nephin 2020-09-09 17:51:51 -04:00
parent f936ca5aea
commit 25f47b46e1
2 changed files with 38 additions and 33 deletions

View File

@ -1,4 +1,4 @@
package consul package grpc
import ( import (
"context" "context"
@ -6,7 +6,6 @@ import (
"net" "net"
"sync" "sync"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/metadata"
@ -18,26 +17,24 @@ type ServerProvider interface {
Servers() []*metadata.Server Servers() []*metadata.Server
} }
type GRPCClient struct { type Client struct {
scheme string
serverProvider ServerProvider serverProvider ServerProvider
tlsConfigurator *tlsutil.Configurator tlsConfigurator *tlsutil.Configurator
grpcConns map[string]*grpc.ClientConn grpcConns map[string]*grpc.ClientConn
grpcConnLock sync.Mutex grpcConnLock sync.Mutex
} }
func NewGRPCClient(logger hclog.Logger, serverProvider ServerProvider, tlsConfigurator *tlsutil.Configurator, scheme string) *GRPCClient { func NewGRPCClient(serverProvider ServerProvider, tlsConfigurator *tlsutil.Configurator) *Client {
// Note we don't actually use the logger anywhere yet but I guess it was added // Note we don't actually use the logger anywhere yet but I guess it was added
// for future compatibility... // for future compatibility...
return &GRPCClient{ return &Client{
scheme: scheme,
serverProvider: serverProvider, serverProvider: serverProvider,
tlsConfigurator: tlsConfigurator, tlsConfigurator: tlsConfigurator,
grpcConns: make(map[string]*grpc.ClientConn), grpcConns: make(map[string]*grpc.ClientConn),
} }
} }
func (c *GRPCClient) GRPCConn(datacenter string) (*grpc.ClientConn, error) { func (c *Client) GRPCConn(datacenter string) (*grpc.ClientConn, error) {
c.grpcConnLock.Lock() c.grpcConnLock.Lock()
defer c.grpcConnLock.Unlock() defer c.grpcConnLock.Unlock()
@ -47,13 +44,14 @@ func (c *GRPCClient) GRPCConn(datacenter string) (*grpc.ClientConn, error) {
} }
dialer := newDialer(c.serverProvider, c.tlsConfigurator.OutgoingRPCWrapper()) dialer := newDialer(c.serverProvider, c.tlsConfigurator.OutgoingRPCWrapper())
conn, err := grpc.Dial(fmt.Sprintf("%s:///server.%s", c.scheme, datacenter), conn, err := grpc.Dial(fmt.Sprintf("%s:///server.%s", scheme, datacenter),
// use WithInsecure mode here because we handle the TLS wrapping in the // use WithInsecure mode here because we handle the TLS wrapping in the
// custom dialer based on logic around whether the server has TLS enabled. // custom dialer based on logic around whether the server has TLS enabled.
grpc.WithInsecure(), grpc.WithInsecure(),
grpc.WithContextDialer(dialer), grpc.WithContextDialer(dialer),
grpc.WithDisableRetry(), grpc.WithDisableRetry(),
grpc.WithStatsHandler(grpcStatsHandler), // TODO: previously this handler was shared with the Handler. Is that necessary?
grpc.WithStatsHandler(&statsHandler{}),
grpc.WithBalancerName("pick_first")) grpc.WithBalancerName("pick_first"))
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -1,6 +1,7 @@
package consul package grpc
import ( import (
"context"
"math/rand" "math/rand"
"strings" "strings"
"sync" "sync"
@ -8,19 +9,22 @@ import (
"github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/serf/serf"
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
) )
var registerLock sync.Mutex //var registerLock sync.Mutex
//
//// registerResolverBuilder registers our custom grpc resolver with the given scheme.
//func registerResolverBuilder(datacenter string) *ServerResolverBuilder {
// registerLock.Lock()
// defer registerLock.Unlock()
// grpcResolverBuilder := NewServerResolverBuilder(datacenter)
// resolver.Register(grpcResolverBuilder)
// return grpcResolverBuilder
//}
// registerResolverBuilder registers our custom grpc resolver with the given scheme. type Nodes interface {
func registerResolverBuilder(scheme, datacenter string, shutdownCh <-chan struct{}) *ServerResolverBuilder { NumNodes() int
registerLock.Lock()
defer registerLock.Unlock()
grpcResolverBuilder := NewServerResolverBuilder(scheme, datacenter, shutdownCh)
resolver.Register(grpcResolverBuilder)
return grpcResolverBuilder
} }
// ServerResolverBuilder tracks the current server list and keeps any // ServerResolverBuilder tracks the current server list and keeps any
@ -32,24 +36,24 @@ type ServerResolverBuilder struct {
datacenter string datacenter string
servers map[string]*metadata.Server servers map[string]*metadata.Server
resolvers map[resolver.ClientConn]*ServerResolver resolvers map[resolver.ClientConn]*ServerResolver
shutdownCh <-chan struct{} nodes Nodes
lock sync.Mutex lock sync.Mutex
} }
func NewServerResolverBuilder(scheme, datacenter string, shutdownCh <-chan struct{}) *ServerResolverBuilder { func NewServerResolverBuilder(nodes Nodes, datacenter string) *ServerResolverBuilder {
return &ServerResolverBuilder{ return &ServerResolverBuilder{
scheme: scheme,
datacenter: datacenter, datacenter: datacenter,
nodes: nodes,
servers: make(map[string]*metadata.Server), servers: make(map[string]*metadata.Server),
resolvers: make(map[resolver.ClientConn]*ServerResolver), resolvers: make(map[resolver.ClientConn]*ServerResolver),
} }
} }
// periodicServerRebalance periodically reshuffles the order of server addresses // Run periodically reshuffles the order of server addresses
// within the resolvers to ensure the load is balanced across servers. // within the resolvers to ensure the load is balanced across servers.
func (s *ServerResolverBuilder) periodicServerRebalance(serf *serf.Serf) { func (s *ServerResolverBuilder) Run(ctx context.Context) {
// Compute the rebalance timer based on the number of local servers and nodes. // Compute the rebalance timer based on the number of local servers and nodes.
rebalanceDuration := router.ComputeRebalanceTimer(s.serversInDC(s.datacenter), serf.NumNodes()) rebalanceDuration := router.ComputeRebalanceTimer(s.serversInDC(s.datacenter), s.nodes.NumNodes())
timer := time.NewTimer(rebalanceDuration) timer := time.NewTimer(rebalanceDuration)
for { for {
@ -58,9 +62,9 @@ func (s *ServerResolverBuilder) periodicServerRebalance(serf *serf.Serf) {
s.rebalanceResolvers() s.rebalanceResolvers()
// Re-compute the wait duration. // Re-compute the wait duration.
newTimerDuration := router.ComputeRebalanceTimer(s.serversInDC(s.datacenter), serf.NumNodes()) newTimerDuration := router.ComputeRebalanceTimer(s.serversInDC(s.datacenter), s.nodes.NumNodes())
timer.Reset(newTimerDuration) timer.Reset(newTimerDuration)
case <-s.shutdownCh: case <-ctx.Done():
timer.Stop() timer.Stop()
return return
} }
@ -115,7 +119,7 @@ func (s *ServerResolverBuilder) Servers() []*metadata.Server {
// Build returns a new ServerResolver for the given ClientConn. The resolver // Build returns a new ServerResolver for the given ClientConn. The resolver
// will keep the ClientConn's state updated based on updates from Serf. // will keep the ClientConn's state updated based on updates from Serf.
func (s *ServerResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) { func (s *ServerResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOption) (resolver.Resolver, error) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
@ -142,7 +146,10 @@ func (s *ServerResolverBuilder) Build(target resolver.Target, cc resolver.Client
return resolver, nil return resolver, nil
} }
func (s *ServerResolverBuilder) Scheme() string { return s.scheme } // scheme is the URL scheme used to dial the Consul Server rpc endpoint.
var scheme = "consul"
func (s *ServerResolverBuilder) Scheme() string { return scheme }
// AddServer updates the resolvers' states to include the new server's address. // AddServer updates the resolvers' states to include the new server's address.
func (s *ServerResolverBuilder) AddServer(server *metadata.Server) { func (s *ServerResolverBuilder) AddServer(server *metadata.Server) {
@ -232,9 +239,9 @@ func (r *ServerResolver) updateAddrsLocked(addrs []resolver.Address) {
r.lastAddrs = addrs r.lastAddrs = addrs
} }
func (s *ServerResolver) Close() { func (r *ServerResolver) Close() {
s.closeCallback() r.closeCallback()
} }
// Unneeded since we only update the ClientConn when our server list changes. // Unneeded since we only update the ClientConn when our server list changes.
func (*ServerResolver) ResolveNow(o resolver.ResolveNowOption) {} func (*ServerResolver) ResolveNow(_ resolver.ResolveNowOption) {}