From 25f47b46e1aa60180fc2a21d3ddc6ea2ec17efff Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 9 Sep 2020 17:51:51 -0400 Subject: [PATCH] grpc: move client conn pool to grpc package --- .../{consul/grpc_client.go => grpc/client.go} | 18 +++---- .../resolver/resolver.go} | 53 +++++++++++-------- 2 files changed, 38 insertions(+), 33 deletions(-) rename agent/{consul/grpc_client.go => grpc/client.go} (84%) rename agent/{consul/grpc_resolver.go => grpc/resolver/resolver.go} (84%) diff --git a/agent/consul/grpc_client.go b/agent/grpc/client.go similarity index 84% rename from agent/consul/grpc_client.go rename to agent/grpc/client.go index 6e6d3df115..d2f9f32b27 100644 --- a/agent/consul/grpc_client.go +++ b/agent/grpc/client.go @@ -1,4 +1,4 @@ -package consul +package grpc import ( "context" @@ -6,7 +6,6 @@ import ( "net" "sync" - "github.com/hashicorp/go-hclog" "google.golang.org/grpc" "github.com/hashicorp/consul/agent/metadata" @@ -18,26 +17,24 @@ type ServerProvider interface { Servers() []*metadata.Server } -type GRPCClient struct { - scheme string +type Client struct { serverProvider ServerProvider tlsConfigurator *tlsutil.Configurator grpcConns map[string]*grpc.ClientConn grpcConnLock sync.Mutex } -func NewGRPCClient(logger hclog.Logger, serverProvider ServerProvider, tlsConfigurator *tlsutil.Configurator, scheme string) *GRPCClient { +func NewGRPCClient(serverProvider ServerProvider, tlsConfigurator *tlsutil.Configurator) *Client { // Note we don't actually use the logger anywhere yet but I guess it was added // for future compatibility... - return &GRPCClient{ - scheme: scheme, + return &Client{ serverProvider: serverProvider, tlsConfigurator: tlsConfigurator, grpcConns: make(map[string]*grpc.ClientConn), } } -func (c *GRPCClient) GRPCConn(datacenter string) (*grpc.ClientConn, error) { +func (c *Client) GRPCConn(datacenter string) (*grpc.ClientConn, error) { c.grpcConnLock.Lock() defer c.grpcConnLock.Unlock() @@ -47,13 +44,14 @@ func (c *GRPCClient) GRPCConn(datacenter string) (*grpc.ClientConn, error) { } dialer := newDialer(c.serverProvider, c.tlsConfigurator.OutgoingRPCWrapper()) - conn, err := grpc.Dial(fmt.Sprintf("%s:///server.%s", c.scheme, datacenter), + conn, err := grpc.Dial(fmt.Sprintf("%s:///server.%s", scheme, datacenter), // use WithInsecure mode here because we handle the TLS wrapping in the // custom dialer based on logic around whether the server has TLS enabled. grpc.WithInsecure(), grpc.WithContextDialer(dialer), grpc.WithDisableRetry(), - grpc.WithStatsHandler(grpcStatsHandler), + // TODO: previously this handler was shared with the Handler. Is that necessary? + grpc.WithStatsHandler(&statsHandler{}), grpc.WithBalancerName("pick_first")) if err != nil { return nil, err diff --git a/agent/consul/grpc_resolver.go b/agent/grpc/resolver/resolver.go similarity index 84% rename from agent/consul/grpc_resolver.go rename to agent/grpc/resolver/resolver.go index 883bf4e24d..fa52af5d28 100644 --- a/agent/consul/grpc_resolver.go +++ b/agent/grpc/resolver/resolver.go @@ -1,6 +1,7 @@ -package consul +package grpc import ( + "context" "math/rand" "strings" "sync" @@ -8,19 +9,22 @@ import ( "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/router" - "github.com/hashicorp/serf/serf" "google.golang.org/grpc/resolver" ) -var registerLock sync.Mutex +//var registerLock sync.Mutex +// +//// registerResolverBuilder registers our custom grpc resolver with the given scheme. +//func registerResolverBuilder(datacenter string) *ServerResolverBuilder { +// registerLock.Lock() +// defer registerLock.Unlock() +// grpcResolverBuilder := NewServerResolverBuilder(datacenter) +// resolver.Register(grpcResolverBuilder) +// return grpcResolverBuilder +//} -// registerResolverBuilder registers our custom grpc resolver with the given scheme. -func registerResolverBuilder(scheme, datacenter string, shutdownCh <-chan struct{}) *ServerResolverBuilder { - registerLock.Lock() - defer registerLock.Unlock() - grpcResolverBuilder := NewServerResolverBuilder(scheme, datacenter, shutdownCh) - resolver.Register(grpcResolverBuilder) - return grpcResolverBuilder +type Nodes interface { + NumNodes() int } // ServerResolverBuilder tracks the current server list and keeps any @@ -32,24 +36,24 @@ type ServerResolverBuilder struct { datacenter string servers map[string]*metadata.Server resolvers map[resolver.ClientConn]*ServerResolver - shutdownCh <-chan struct{} + nodes Nodes lock sync.Mutex } -func NewServerResolverBuilder(scheme, datacenter string, shutdownCh <-chan struct{}) *ServerResolverBuilder { +func NewServerResolverBuilder(nodes Nodes, datacenter string) *ServerResolverBuilder { return &ServerResolverBuilder{ - scheme: scheme, datacenter: datacenter, + nodes: nodes, servers: make(map[string]*metadata.Server), resolvers: make(map[resolver.ClientConn]*ServerResolver), } } -// periodicServerRebalance periodically reshuffles the order of server addresses +// Run periodically reshuffles the order of server addresses // within the resolvers to ensure the load is balanced across servers. -func (s *ServerResolverBuilder) periodicServerRebalance(serf *serf.Serf) { +func (s *ServerResolverBuilder) Run(ctx context.Context) { // Compute the rebalance timer based on the number of local servers and nodes. - rebalanceDuration := router.ComputeRebalanceTimer(s.serversInDC(s.datacenter), serf.NumNodes()) + rebalanceDuration := router.ComputeRebalanceTimer(s.serversInDC(s.datacenter), s.nodes.NumNodes()) timer := time.NewTimer(rebalanceDuration) for { @@ -58,9 +62,9 @@ func (s *ServerResolverBuilder) periodicServerRebalance(serf *serf.Serf) { s.rebalanceResolvers() // Re-compute the wait duration. - newTimerDuration := router.ComputeRebalanceTimer(s.serversInDC(s.datacenter), serf.NumNodes()) + newTimerDuration := router.ComputeRebalanceTimer(s.serversInDC(s.datacenter), s.nodes.NumNodes()) timer.Reset(newTimerDuration) - case <-s.shutdownCh: + case <-ctx.Done(): timer.Stop() return } @@ -115,7 +119,7 @@ func (s *ServerResolverBuilder) Servers() []*metadata.Server { // Build returns a new ServerResolver for the given ClientConn. The resolver // will keep the ClientConn's state updated based on updates from Serf. -func (s *ServerResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) { +func (s *ServerResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOption) (resolver.Resolver, error) { s.lock.Lock() defer s.lock.Unlock() @@ -142,7 +146,10 @@ func (s *ServerResolverBuilder) Build(target resolver.Target, cc resolver.Client return resolver, nil } -func (s *ServerResolverBuilder) Scheme() string { return s.scheme } +// scheme is the URL scheme used to dial the Consul Server rpc endpoint. +var scheme = "consul" + +func (s *ServerResolverBuilder) Scheme() string { return scheme } // AddServer updates the resolvers' states to include the new server's address. func (s *ServerResolverBuilder) AddServer(server *metadata.Server) { @@ -232,9 +239,9 @@ func (r *ServerResolver) updateAddrsLocked(addrs []resolver.Address) { r.lastAddrs = addrs } -func (s *ServerResolver) Close() { - s.closeCallback() +func (r *ServerResolver) Close() { + r.closeCallback() } // Unneeded since we only update the ClientConn when our server list changes. -func (*ServerResolver) ResolveNow(o resolver.ResolveNowOption) {} +func (*ServerResolver) ResolveNow(_ resolver.ResolveNowOption) {}