From e20e6348dd92c39e9cf7781b886f0a7fa4578486 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Mon, 6 Dec 2021 09:55:54 -0600 Subject: [PATCH] areas: make the gRPC server tracker network area aware (#11748) Fixes a bug whereby servers present in multiple network areas would be properly segmented in the Router, but not in the gRPC mirror. This would lead servers in the current datacenter leaving from a network area (possibly during the network area's removal) from deleting their own records that still exist in the standard WAN area. The gRPC client stack uses the gRPC server tracker to execute all RPCs, even those targeting members of the current datacenter (which is unlike the net/rpc stack which has a bypass mechanism). This would manifest as a gRPC method call never opening a socket because it would block forever waiting for the current datacenter's pool of servers to be non-empty. --- .changelog/11748.txt | 3 ++ agent/grpc/client_test.go | 21 +++++----- agent/grpc/resolver/resolver.go | 70 +++++++++++++++++++++++---------- agent/router/grpc.go | 13 +++--- agent/router/router.go | 10 ++--- 5 files changed, 76 insertions(+), 41 deletions(-) create mode 100644 .changelog/11748.txt diff --git a/.changelog/11748.txt b/.changelog/11748.txt new file mode 100644 index 0000000000..8917ed93f0 --- /dev/null +++ b/.changelog/11748.txt @@ -0,0 +1,3 @@ +```release-note:bug +areas: **(Enterprise only)** make the gRPC server tracker network area aware +``` diff --git a/agent/grpc/client_test.go b/agent/grpc/client_test.go index 3fa90e218d..0dae1e236a 100644 --- a/agent/grpc/client_test.go +++ b/agent/grpc/client_test.go @@ -20,6 +20,7 @@ import ( "github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/sdk/freeport" "github.com/hashicorp/consul/tlsutil" + "github.com/hashicorp/consul/types" ) // useTLSForDcAlwaysTrue tell GRPC to always return the TLS is enabled @@ -33,7 +34,7 @@ func TestNewDialer_WithTLSWrapper(t *testing.T) { t.Cleanup(logError(t, lis.Close)) builder := resolver.NewServerResolverBuilder(newConfig(t)) - builder.AddServer(&metadata.Server{ + builder.AddServer(types.AreaWAN, &metadata.Server{ Name: "server-1", ID: "ID1", Datacenter: "dc1", @@ -84,14 +85,14 @@ func TestNewDialer_WithALPNWrapper(t *testing.T) { }() builder := resolver.NewServerResolverBuilder(newConfig(t)) - builder.AddServer(&metadata.Server{ + builder.AddServer(types.AreaWAN, &metadata.Server{ Name: "server-1", ID: "ID1", Datacenter: "dc1", Addr: lis1.Addr(), UseTLS: true, }) - builder.AddServer(&metadata.Server{ + builder.AddServer(types.AreaWAN, &metadata.Server{ Name: "server-2", ID: "ID2", Datacenter: "dc2", @@ -153,7 +154,7 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler(t *testing.T) { srv := newTestServer(t, "server-1", "dc1", tlsConf) md := srv.Metadata() - res.AddServer(md) + res.AddServer(types.AreaWAN, md) t.Cleanup(srv.shutdown) pool := NewClientConnPool(ClientConnPoolConfig{ @@ -211,7 +212,7 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler_viaMeshGateway(t *testing.T) }() md := srv.Metadata() - res.AddServer(md) + res.AddServer(types.AreaWAN, md) t.Cleanup(srv.shutdown) clientTLSConf, err := tlsutil.NewConfigurator(tlsutil.Config{ @@ -266,7 +267,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) { for i := 0; i < count; i++ { name := fmt.Sprintf("server-%d", i) srv := newTestServer(t, name, "dc1", nil) - res.AddServer(srv.Metadata()) + res.AddServer(types.AreaWAN, srv.Metadata()) t.Cleanup(srv.shutdown) } @@ -280,7 +281,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) { first, err := client.Something(ctx, &testservice.Req{}) require.NoError(t, err) - res.RemoveServer(&metadata.Server{ID: first.ServerName, Datacenter: "dc1"}) + res.RemoveServer(types.AreaWAN, &metadata.Server{ID: first.ServerName, Datacenter: "dc1"}) resp, err := client.Something(ctx, &testservice.Req{}) require.NoError(t, err) @@ -302,7 +303,7 @@ func TestClientConnPool_ForwardToLeader_Failover(t *testing.T) { for i := 0; i < count; i++ { name := fmt.Sprintf("server-%d", i) srv := newTestServer(t, name, "dc1", nil) - res.AddServer(srv.Metadata()) + res.AddServer(types.AreaWAN, srv.Metadata()) servers = append(servers, srv) t.Cleanup(srv.shutdown) } @@ -352,7 +353,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Rebalance(t *testing.T) { for i := 0; i < count; i++ { name := fmt.Sprintf("server-%d", i) srv := newTestServer(t, name, "dc1", nil) - res.AddServer(srv.Metadata()) + res.AddServer(types.AreaWAN, srv.Metadata()) t.Cleanup(srv.shutdown) } @@ -406,7 +407,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_MultiDC(t *testing.T) { for _, dc := range dcs { name := "server-0-" + dc srv := newTestServer(t, name, dc, nil) - res.AddServer(srv.Metadata()) + res.AddServer(types.AreaWAN, srv.Metadata()) t.Cleanup(srv.shutdown) } diff --git a/agent/grpc/resolver/resolver.go b/agent/grpc/resolver/resolver.go index f6c3d7fe95..e77ee568d6 100644 --- a/agent/grpc/resolver/resolver.go +++ b/agent/grpc/resolver/resolver.go @@ -10,6 +10,7 @@ import ( "google.golang.org/grpc/resolver" "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/consul/types" ) // ServerResolverBuilder tracks the current server list and keeps any @@ -18,9 +19,9 @@ type ServerResolverBuilder struct { cfg Config // leaderResolver is used to track the address of the leader in the local DC. leaderResolver leaderResolver - // servers is an index of Servers by Server.ID. The map contains server IDs + // servers is an index of Servers by area and Server.ID. The map contains server IDs // for all datacenters. - servers map[string]*metadata.Server + servers map[types.AreaID]map[string]*metadata.Server // resolvers is an index of connections to the serverResolver which manages // addresses of servers for that connection. resolvers map[resolver.ClientConn]*serverResolver @@ -37,7 +38,7 @@ type Config struct { func NewServerResolverBuilder(cfg Config) *ServerResolverBuilder { return &ServerResolverBuilder{ cfg: cfg, - servers: make(map[string]*metadata.Server), + servers: make(map[types.AreaID]map[string]*metadata.Server), resolvers: make(map[resolver.ClientConn]*serverResolver), } } @@ -72,9 +73,11 @@ func (s *ServerResolverBuilder) ServerForGlobalAddr(globalAddr string) (*metadat s.lock.RLock() defer s.lock.RUnlock() - for _, server := range s.servers { - if DCPrefix(server.Datacenter, server.Addr.String()) == globalAddr { - return server, nil + for _, areaServers := range s.servers { + for _, server := range areaServers { + if DCPrefix(server.Datacenter, server.Addr.String()) == globalAddr { + return server, nil + } } } return nil, fmt.Errorf("failed to find Consul server for global address %q", globalAddr) @@ -138,11 +141,17 @@ func (s *ServerResolverBuilder) Authority() string { } // AddServer updates the resolvers' states to include the new server's address. -func (s *ServerResolverBuilder) AddServer(server *metadata.Server) { +func (s *ServerResolverBuilder) AddServer(areaID types.AreaID, server *metadata.Server) { s.lock.Lock() defer s.lock.Unlock() - s.servers[uniqueID(server)] = server + areaServers, ok := s.servers[areaID] + if !ok { + areaServers = make(map[string]*metadata.Server) + s.servers[areaID] = areaServers + } + + areaServers[uniqueID(server)] = server addrs := s.getDCAddrs(server.Datacenter) for _, resolver := range s.resolvers { @@ -168,11 +177,19 @@ func DCPrefix(datacenter, suffix string) string { } // RemoveServer updates the resolvers' states with the given server removed. -func (s *ServerResolverBuilder) RemoveServer(server *metadata.Server) { +func (s *ServerResolverBuilder) RemoveServer(areaID types.AreaID, server *metadata.Server) { s.lock.Lock() defer s.lock.Unlock() - delete(s.servers, uniqueID(server)) + areaServers, ok := s.servers[areaID] + if !ok { + return // already gone + } + + delete(areaServers, uniqueID(server)) + if len(areaServers) == 0 { + delete(s.servers, areaID) + } addrs := s.getDCAddrs(server.Datacenter) for _, resolver := range s.resolvers { @@ -185,18 +202,29 @@ func (s *ServerResolverBuilder) RemoveServer(server *metadata.Server) { // getDCAddrs returns a list of the server addresses for the given datacenter. // This method requires that lock is held for reads. func (s *ServerResolverBuilder) getDCAddrs(dc string) []resolver.Address { - var addrs []resolver.Address - for _, server := range s.servers { - if server.Datacenter != dc { - continue - } + var ( + addrs []resolver.Address + keptServerIDs = make(map[string]struct{}) + ) + for _, areaServers := range s.servers { + for _, server := range areaServers { + if server.Datacenter != dc { + continue + } - addrs = append(addrs, resolver.Address{ - // NOTE: the address persisted here is only dialable using our custom dialer - Addr: DCPrefix(server.Datacenter, server.Addr.String()), - Type: resolver.Backend, - ServerName: server.Name, - }) + // Servers may be part of multiple areas, so only include each one once. + if _, ok := keptServerIDs[server.ID]; ok { + continue + } + keptServerIDs[server.ID] = struct{}{} + + addrs = append(addrs, resolver.Address{ + // NOTE: the address persisted here is only dialable using our custom dialer + Addr: DCPrefix(server.Datacenter, server.Addr.String()), + Type: resolver.Backend, + ServerName: server.Name, + }) + } } return addrs } diff --git a/agent/router/grpc.go b/agent/router/grpc.go index c4fe96d25f..44600d42ad 100644 --- a/agent/router/grpc.go +++ b/agent/router/grpc.go @@ -1,13 +1,16 @@ package router -import "github.com/hashicorp/consul/agent/metadata" +import ( + "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/consul/types" +) // ServerTracker is called when Router is notified of a server being added or // removed. type ServerTracker interface { NewRebalancer(dc string) func() - AddServer(*metadata.Server) - RemoveServer(*metadata.Server) + AddServer(types.AreaID, *metadata.Server) + RemoveServer(types.AreaID, *metadata.Server) } // Rebalancer is called periodically to re-order the servers so that the load on the @@ -24,7 +27,7 @@ func (NoOpServerTracker) NewRebalancer(string) func() { } // AddServer does nothing -func (NoOpServerTracker) AddServer(*metadata.Server) {} +func (NoOpServerTracker) AddServer(types.AreaID, *metadata.Server) {} // RemoveServer does nothing -func (NoOpServerTracker) RemoveServer(*metadata.Server) {} +func (NoOpServerTracker) RemoveServer(types.AreaID, *metadata.Server) {} diff --git a/agent/router/router.go b/agent/router/router.go index 9aaae8739b..1389a30f6c 100644 --- a/agent/router/router.go +++ b/agent/router/router.go @@ -175,7 +175,7 @@ func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger continue } - if err := r.addServer(area, parts); err != nil { + if err := r.addServer(areaID, area, parts); err != nil { return fmt.Errorf("failed to add server %q to area %q: %v", m.Name, areaID, err) } } @@ -276,7 +276,7 @@ func (r *Router) maybeInitializeManager(area *areaInfo, dc string) *Manager { } // addServer does the work of AddServer once the write lock is held. -func (r *Router) addServer(area *areaInfo, s *metadata.Server) error { +func (r *Router) addServer(areaID types.AreaID, area *areaInfo, s *metadata.Server) error { // Make the manager on the fly if this is the first we've seen of it, // and add it to the index. manager := r.maybeInitializeManager(area, s.Datacenter) @@ -288,7 +288,7 @@ func (r *Router) addServer(area *areaInfo, s *metadata.Server) error { } manager.AddServer(s) - r.grpcServerTracker.AddServer(s) + r.grpcServerTracker.AddServer(areaID, s) return nil } @@ -302,7 +302,7 @@ func (r *Router) AddServer(areaID types.AreaID, s *metadata.Server) error { if !ok { return fmt.Errorf("area ID %q does not exist", areaID) } - return r.addServer(area, s) + return r.addServer(areaID, area, s) } // RemoveServer should be called whenever a server is removed from an area. This @@ -324,7 +324,7 @@ func (r *Router) RemoveServer(areaID types.AreaID, s *metadata.Server) error { return nil } info.manager.RemoveServer(s) - r.grpcServerTracker.RemoveServer(s) + r.grpcServerTracker.RemoveServer(areaID, s) // If this manager is empty then remove it so we don't accumulate cruft // and waste time during request routing.