diff --git a/.changelog/11748.txt b/.changelog/11748.txt new file mode 100644 index 0000000000..8917ed93f0 --- /dev/null +++ b/.changelog/11748.txt @@ -0,0 +1,3 @@ +```release-note:bug +areas: **(Enterprise only)** make the gRPC server tracker network area aware +``` diff --git a/agent/grpc/client_test.go b/agent/grpc/client_test.go index 3fa90e218d..0dae1e236a 100644 --- a/agent/grpc/client_test.go +++ b/agent/grpc/client_test.go @@ -20,6 +20,7 @@ import ( "github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/sdk/freeport" "github.com/hashicorp/consul/tlsutil" + "github.com/hashicorp/consul/types" ) // useTLSForDcAlwaysTrue tell GRPC to always return the TLS is enabled @@ -33,7 +34,7 @@ func TestNewDialer_WithTLSWrapper(t *testing.T) { t.Cleanup(logError(t, lis.Close)) builder := resolver.NewServerResolverBuilder(newConfig(t)) - builder.AddServer(&metadata.Server{ + builder.AddServer(types.AreaWAN, &metadata.Server{ Name: "server-1", ID: "ID1", Datacenter: "dc1", @@ -84,14 +85,14 @@ func TestNewDialer_WithALPNWrapper(t *testing.T) { }() builder := resolver.NewServerResolverBuilder(newConfig(t)) - builder.AddServer(&metadata.Server{ + builder.AddServer(types.AreaWAN, &metadata.Server{ Name: "server-1", ID: "ID1", Datacenter: "dc1", Addr: lis1.Addr(), UseTLS: true, }) - builder.AddServer(&metadata.Server{ + builder.AddServer(types.AreaWAN, &metadata.Server{ Name: "server-2", ID: "ID2", Datacenter: "dc2", @@ -153,7 +154,7 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler(t *testing.T) { srv := newTestServer(t, "server-1", "dc1", tlsConf) md := srv.Metadata() - res.AddServer(md) + res.AddServer(types.AreaWAN, md) t.Cleanup(srv.shutdown) pool := NewClientConnPool(ClientConnPoolConfig{ @@ -211,7 +212,7 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler_viaMeshGateway(t *testing.T) }() md := srv.Metadata() - res.AddServer(md) + res.AddServer(types.AreaWAN, md) t.Cleanup(srv.shutdown) clientTLSConf, err := tlsutil.NewConfigurator(tlsutil.Config{ @@ -266,7 +267,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) { for i := 0; i < count; i++ { name := fmt.Sprintf("server-%d", i) srv := newTestServer(t, name, "dc1", nil) - res.AddServer(srv.Metadata()) + res.AddServer(types.AreaWAN, srv.Metadata()) t.Cleanup(srv.shutdown) } @@ -280,7 +281,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) { first, err := client.Something(ctx, &testservice.Req{}) require.NoError(t, err) - res.RemoveServer(&metadata.Server{ID: first.ServerName, Datacenter: "dc1"}) + res.RemoveServer(types.AreaWAN, &metadata.Server{ID: first.ServerName, Datacenter: "dc1"}) resp, err := client.Something(ctx, &testservice.Req{}) require.NoError(t, err) @@ -302,7 +303,7 @@ func TestClientConnPool_ForwardToLeader_Failover(t *testing.T) { for i := 0; i < count; i++ { name := fmt.Sprintf("server-%d", i) srv := newTestServer(t, name, "dc1", nil) - res.AddServer(srv.Metadata()) + res.AddServer(types.AreaWAN, srv.Metadata()) servers = append(servers, srv) t.Cleanup(srv.shutdown) } @@ -352,7 +353,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Rebalance(t *testing.T) { for i := 0; i < count; i++ { name := fmt.Sprintf("server-%d", i) srv := newTestServer(t, name, "dc1", nil) - res.AddServer(srv.Metadata()) + res.AddServer(types.AreaWAN, srv.Metadata()) t.Cleanup(srv.shutdown) } @@ -406,7 +407,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_MultiDC(t *testing.T) { for _, dc := range dcs { name := "server-0-" + dc srv := newTestServer(t, name, dc, nil) - res.AddServer(srv.Metadata()) + res.AddServer(types.AreaWAN, srv.Metadata()) t.Cleanup(srv.shutdown) } diff --git a/agent/grpc/resolver/resolver.go b/agent/grpc/resolver/resolver.go index f6c3d7fe95..e77ee568d6 100644 --- a/agent/grpc/resolver/resolver.go +++ b/agent/grpc/resolver/resolver.go @@ -10,6 +10,7 @@ import ( "google.golang.org/grpc/resolver" "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/consul/types" ) // ServerResolverBuilder tracks the current server list and keeps any @@ -18,9 +19,9 @@ type ServerResolverBuilder struct { cfg Config // leaderResolver is used to track the address of the leader in the local DC. leaderResolver leaderResolver - // servers is an index of Servers by Server.ID. The map contains server IDs + // servers is an index of Servers by area and Server.ID. The map contains server IDs // for all datacenters. - servers map[string]*metadata.Server + servers map[types.AreaID]map[string]*metadata.Server // resolvers is an index of connections to the serverResolver which manages // addresses of servers for that connection. resolvers map[resolver.ClientConn]*serverResolver @@ -37,7 +38,7 @@ type Config struct { func NewServerResolverBuilder(cfg Config) *ServerResolverBuilder { return &ServerResolverBuilder{ cfg: cfg, - servers: make(map[string]*metadata.Server), + servers: make(map[types.AreaID]map[string]*metadata.Server), resolvers: make(map[resolver.ClientConn]*serverResolver), } } @@ -72,9 +73,11 @@ func (s *ServerResolverBuilder) ServerForGlobalAddr(globalAddr string) (*metadat s.lock.RLock() defer s.lock.RUnlock() - for _, server := range s.servers { - if DCPrefix(server.Datacenter, server.Addr.String()) == globalAddr { - return server, nil + for _, areaServers := range s.servers { + for _, server := range areaServers { + if DCPrefix(server.Datacenter, server.Addr.String()) == globalAddr { + return server, nil + } } } return nil, fmt.Errorf("failed to find Consul server for global address %q", globalAddr) @@ -138,11 +141,17 @@ func (s *ServerResolverBuilder) Authority() string { } // AddServer updates the resolvers' states to include the new server's address. -func (s *ServerResolverBuilder) AddServer(server *metadata.Server) { +func (s *ServerResolverBuilder) AddServer(areaID types.AreaID, server *metadata.Server) { s.lock.Lock() defer s.lock.Unlock() - s.servers[uniqueID(server)] = server + areaServers, ok := s.servers[areaID] + if !ok { + areaServers = make(map[string]*metadata.Server) + s.servers[areaID] = areaServers + } + + areaServers[uniqueID(server)] = server addrs := s.getDCAddrs(server.Datacenter) for _, resolver := range s.resolvers { @@ -168,11 +177,19 @@ func DCPrefix(datacenter, suffix string) string { } // RemoveServer updates the resolvers' states with the given server removed. -func (s *ServerResolverBuilder) RemoveServer(server *metadata.Server) { +func (s *ServerResolverBuilder) RemoveServer(areaID types.AreaID, server *metadata.Server) { s.lock.Lock() defer s.lock.Unlock() - delete(s.servers, uniqueID(server)) + areaServers, ok := s.servers[areaID] + if !ok { + return // already gone + } + + delete(areaServers, uniqueID(server)) + if len(areaServers) == 0 { + delete(s.servers, areaID) + } addrs := s.getDCAddrs(server.Datacenter) for _, resolver := range s.resolvers { @@ -185,18 +202,29 @@ func (s *ServerResolverBuilder) RemoveServer(server *metadata.Server) { // getDCAddrs returns a list of the server addresses for the given datacenter. // This method requires that lock is held for reads. func (s *ServerResolverBuilder) getDCAddrs(dc string) []resolver.Address { - var addrs []resolver.Address - for _, server := range s.servers { - if server.Datacenter != dc { - continue - } + var ( + addrs []resolver.Address + keptServerIDs = make(map[string]struct{}) + ) + for _, areaServers := range s.servers { + for _, server := range areaServers { + if server.Datacenter != dc { + continue + } - addrs = append(addrs, resolver.Address{ - // NOTE: the address persisted here is only dialable using our custom dialer - Addr: DCPrefix(server.Datacenter, server.Addr.String()), - Type: resolver.Backend, - ServerName: server.Name, - }) + // Servers may be part of multiple areas, so only include each one once. + if _, ok := keptServerIDs[server.ID]; ok { + continue + } + keptServerIDs[server.ID] = struct{}{} + + addrs = append(addrs, resolver.Address{ + // NOTE: the address persisted here is only dialable using our custom dialer + Addr: DCPrefix(server.Datacenter, server.Addr.String()), + Type: resolver.Backend, + ServerName: server.Name, + }) + } } return addrs } diff --git a/agent/router/grpc.go b/agent/router/grpc.go index c4fe96d25f..44600d42ad 100644 --- a/agent/router/grpc.go +++ b/agent/router/grpc.go @@ -1,13 +1,16 @@ package router -import "github.com/hashicorp/consul/agent/metadata" +import ( + "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/consul/types" +) // ServerTracker is called when Router is notified of a server being added or // removed. type ServerTracker interface { NewRebalancer(dc string) func() - AddServer(*metadata.Server) - RemoveServer(*metadata.Server) + AddServer(types.AreaID, *metadata.Server) + RemoveServer(types.AreaID, *metadata.Server) } // Rebalancer is called periodically to re-order the servers so that the load on the @@ -24,7 +27,7 @@ func (NoOpServerTracker) NewRebalancer(string) func() { } // AddServer does nothing -func (NoOpServerTracker) AddServer(*metadata.Server) {} +func (NoOpServerTracker) AddServer(types.AreaID, *metadata.Server) {} // RemoveServer does nothing -func (NoOpServerTracker) RemoveServer(*metadata.Server) {} +func (NoOpServerTracker) RemoveServer(types.AreaID, *metadata.Server) {} diff --git a/agent/router/router.go b/agent/router/router.go index 9aaae8739b..1389a30f6c 100644 --- a/agent/router/router.go +++ b/agent/router/router.go @@ -175,7 +175,7 @@ func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger continue } - if err := r.addServer(area, parts); err != nil { + if err := r.addServer(areaID, area, parts); err != nil { return fmt.Errorf("failed to add server %q to area %q: %v", m.Name, areaID, err) } } @@ -276,7 +276,7 @@ func (r *Router) maybeInitializeManager(area *areaInfo, dc string) *Manager { } // addServer does the work of AddServer once the write lock is held. -func (r *Router) addServer(area *areaInfo, s *metadata.Server) error { +func (r *Router) addServer(areaID types.AreaID, area *areaInfo, s *metadata.Server) error { // Make the manager on the fly if this is the first we've seen of it, // and add it to the index. manager := r.maybeInitializeManager(area, s.Datacenter) @@ -288,7 +288,7 @@ func (r *Router) addServer(area *areaInfo, s *metadata.Server) error { } manager.AddServer(s) - r.grpcServerTracker.AddServer(s) + r.grpcServerTracker.AddServer(areaID, s) return nil } @@ -302,7 +302,7 @@ func (r *Router) AddServer(areaID types.AreaID, s *metadata.Server) error { if !ok { return fmt.Errorf("area ID %q does not exist", areaID) } - return r.addServer(area, s) + return r.addServer(areaID, area, s) } // RemoveServer should be called whenever a server is removed from an area. This @@ -324,7 +324,7 @@ func (r *Router) RemoveServer(areaID types.AreaID, s *metadata.Server) error { return nil } info.manager.RemoveServer(s) - r.grpcServerTracker.RemoveServer(s) + r.grpcServerTracker.RemoveServer(areaID, s) // If this manager is empty then remove it so we don't accumulate cruft // and waste time during request routing.