mirror of https://github.com/status-im/consul.git
areas: make the gRPC server tracker network area aware (#11748)
Fixes a bug whereby servers present in multiple network areas would be properly segmented in the Router, but not in the gRPC mirror. This would lead servers in the current datacenter leaving from a network area (possibly during the network area's removal) from deleting their own records that still exist in the standard WAN area. The gRPC client stack uses the gRPC server tracker to execute all RPCs, even those targeting members of the current datacenter (which is unlike the net/rpc stack which has a bypass mechanism). This would manifest as a gRPC method call never opening a socket because it would block forever waiting for the current datacenter's pool of servers to be non-empty.
This commit is contained in:
parent
a725f06c83
commit
e20e6348dd
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
areas: **(Enterprise only)** make the gRPC server tracker network area aware
|
||||||
|
```
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"github.com/hashicorp/consul/ipaddr"
|
"github.com/hashicorp/consul/ipaddr"
|
||||||
"github.com/hashicorp/consul/sdk/freeport"
|
"github.com/hashicorp/consul/sdk/freeport"
|
||||||
"github.com/hashicorp/consul/tlsutil"
|
"github.com/hashicorp/consul/tlsutil"
|
||||||
|
"github.com/hashicorp/consul/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
// useTLSForDcAlwaysTrue tell GRPC to always return the TLS is enabled
|
// useTLSForDcAlwaysTrue tell GRPC to always return the TLS is enabled
|
||||||
|
@ -33,7 +34,7 @@ func TestNewDialer_WithTLSWrapper(t *testing.T) {
|
||||||
t.Cleanup(logError(t, lis.Close))
|
t.Cleanup(logError(t, lis.Close))
|
||||||
|
|
||||||
builder := resolver.NewServerResolverBuilder(newConfig(t))
|
builder := resolver.NewServerResolverBuilder(newConfig(t))
|
||||||
builder.AddServer(&metadata.Server{
|
builder.AddServer(types.AreaWAN, &metadata.Server{
|
||||||
Name: "server-1",
|
Name: "server-1",
|
||||||
ID: "ID1",
|
ID: "ID1",
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
|
@ -84,14 +85,14 @@ func TestNewDialer_WithALPNWrapper(t *testing.T) {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
builder := resolver.NewServerResolverBuilder(newConfig(t))
|
builder := resolver.NewServerResolverBuilder(newConfig(t))
|
||||||
builder.AddServer(&metadata.Server{
|
builder.AddServer(types.AreaWAN, &metadata.Server{
|
||||||
Name: "server-1",
|
Name: "server-1",
|
||||||
ID: "ID1",
|
ID: "ID1",
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Addr: lis1.Addr(),
|
Addr: lis1.Addr(),
|
||||||
UseTLS: true,
|
UseTLS: true,
|
||||||
})
|
})
|
||||||
builder.AddServer(&metadata.Server{
|
builder.AddServer(types.AreaWAN, &metadata.Server{
|
||||||
Name: "server-2",
|
Name: "server-2",
|
||||||
ID: "ID2",
|
ID: "ID2",
|
||||||
Datacenter: "dc2",
|
Datacenter: "dc2",
|
||||||
|
@ -153,7 +154,7 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler(t *testing.T) {
|
||||||
srv := newTestServer(t, "server-1", "dc1", tlsConf)
|
srv := newTestServer(t, "server-1", "dc1", tlsConf)
|
||||||
|
|
||||||
md := srv.Metadata()
|
md := srv.Metadata()
|
||||||
res.AddServer(md)
|
res.AddServer(types.AreaWAN, md)
|
||||||
t.Cleanup(srv.shutdown)
|
t.Cleanup(srv.shutdown)
|
||||||
|
|
||||||
pool := NewClientConnPool(ClientConnPoolConfig{
|
pool := NewClientConnPool(ClientConnPoolConfig{
|
||||||
|
@ -211,7 +212,7 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler_viaMeshGateway(t *testing.T)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
md := srv.Metadata()
|
md := srv.Metadata()
|
||||||
res.AddServer(md)
|
res.AddServer(types.AreaWAN, md)
|
||||||
t.Cleanup(srv.shutdown)
|
t.Cleanup(srv.shutdown)
|
||||||
|
|
||||||
clientTLSConf, err := tlsutil.NewConfigurator(tlsutil.Config{
|
clientTLSConf, err := tlsutil.NewConfigurator(tlsutil.Config{
|
||||||
|
@ -266,7 +267,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) {
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
name := fmt.Sprintf("server-%d", i)
|
name := fmt.Sprintf("server-%d", i)
|
||||||
srv := newTestServer(t, name, "dc1", nil)
|
srv := newTestServer(t, name, "dc1", nil)
|
||||||
res.AddServer(srv.Metadata())
|
res.AddServer(types.AreaWAN, srv.Metadata())
|
||||||
t.Cleanup(srv.shutdown)
|
t.Cleanup(srv.shutdown)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -280,7 +281,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) {
|
||||||
first, err := client.Something(ctx, &testservice.Req{})
|
first, err := client.Something(ctx, &testservice.Req{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
res.RemoveServer(&metadata.Server{ID: first.ServerName, Datacenter: "dc1"})
|
res.RemoveServer(types.AreaWAN, &metadata.Server{ID: first.ServerName, Datacenter: "dc1"})
|
||||||
|
|
||||||
resp, err := client.Something(ctx, &testservice.Req{})
|
resp, err := client.Something(ctx, &testservice.Req{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -302,7 +303,7 @@ func TestClientConnPool_ForwardToLeader_Failover(t *testing.T) {
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
name := fmt.Sprintf("server-%d", i)
|
name := fmt.Sprintf("server-%d", i)
|
||||||
srv := newTestServer(t, name, "dc1", nil)
|
srv := newTestServer(t, name, "dc1", nil)
|
||||||
res.AddServer(srv.Metadata())
|
res.AddServer(types.AreaWAN, srv.Metadata())
|
||||||
servers = append(servers, srv)
|
servers = append(servers, srv)
|
||||||
t.Cleanup(srv.shutdown)
|
t.Cleanup(srv.shutdown)
|
||||||
}
|
}
|
||||||
|
@ -352,7 +353,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Rebalance(t *testing.T) {
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
name := fmt.Sprintf("server-%d", i)
|
name := fmt.Sprintf("server-%d", i)
|
||||||
srv := newTestServer(t, name, "dc1", nil)
|
srv := newTestServer(t, name, "dc1", nil)
|
||||||
res.AddServer(srv.Metadata())
|
res.AddServer(types.AreaWAN, srv.Metadata())
|
||||||
t.Cleanup(srv.shutdown)
|
t.Cleanup(srv.shutdown)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -406,7 +407,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_MultiDC(t *testing.T) {
|
||||||
for _, dc := range dcs {
|
for _, dc := range dcs {
|
||||||
name := "server-0-" + dc
|
name := "server-0-" + dc
|
||||||
srv := newTestServer(t, name, dc, nil)
|
srv := newTestServer(t, name, dc, nil)
|
||||||
res.AddServer(srv.Metadata())
|
res.AddServer(types.AreaWAN, srv.Metadata())
|
||||||
t.Cleanup(srv.shutdown)
|
t.Cleanup(srv.shutdown)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"google.golang.org/grpc/resolver"
|
"google.golang.org/grpc/resolver"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/metadata"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
|
"github.com/hashicorp/consul/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ServerResolverBuilder tracks the current server list and keeps any
|
// ServerResolverBuilder tracks the current server list and keeps any
|
||||||
|
@ -18,9 +19,9 @@ type ServerResolverBuilder struct {
|
||||||
cfg Config
|
cfg Config
|
||||||
// leaderResolver is used to track the address of the leader in the local DC.
|
// leaderResolver is used to track the address of the leader in the local DC.
|
||||||
leaderResolver leaderResolver
|
leaderResolver leaderResolver
|
||||||
// servers is an index of Servers by Server.ID. The map contains server IDs
|
// servers is an index of Servers by area and Server.ID. The map contains server IDs
|
||||||
// for all datacenters.
|
// for all datacenters.
|
||||||
servers map[string]*metadata.Server
|
servers map[types.AreaID]map[string]*metadata.Server
|
||||||
// resolvers is an index of connections to the serverResolver which manages
|
// resolvers is an index of connections to the serverResolver which manages
|
||||||
// addresses of servers for that connection.
|
// addresses of servers for that connection.
|
||||||
resolvers map[resolver.ClientConn]*serverResolver
|
resolvers map[resolver.ClientConn]*serverResolver
|
||||||
|
@ -37,7 +38,7 @@ type Config struct {
|
||||||
func NewServerResolverBuilder(cfg Config) *ServerResolverBuilder {
|
func NewServerResolverBuilder(cfg Config) *ServerResolverBuilder {
|
||||||
return &ServerResolverBuilder{
|
return &ServerResolverBuilder{
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
servers: make(map[string]*metadata.Server),
|
servers: make(map[types.AreaID]map[string]*metadata.Server),
|
||||||
resolvers: make(map[resolver.ClientConn]*serverResolver),
|
resolvers: make(map[resolver.ClientConn]*serverResolver),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -72,9 +73,11 @@ func (s *ServerResolverBuilder) ServerForGlobalAddr(globalAddr string) (*metadat
|
||||||
s.lock.RLock()
|
s.lock.RLock()
|
||||||
defer s.lock.RUnlock()
|
defer s.lock.RUnlock()
|
||||||
|
|
||||||
for _, server := range s.servers {
|
for _, areaServers := range s.servers {
|
||||||
if DCPrefix(server.Datacenter, server.Addr.String()) == globalAddr {
|
for _, server := range areaServers {
|
||||||
return server, nil
|
if DCPrefix(server.Datacenter, server.Addr.String()) == globalAddr {
|
||||||
|
return server, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("failed to find Consul server for global address %q", globalAddr)
|
return nil, fmt.Errorf("failed to find Consul server for global address %q", globalAddr)
|
||||||
|
@ -138,11 +141,17 @@ func (s *ServerResolverBuilder) Authority() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddServer updates the resolvers' states to include the new server's address.
|
// AddServer updates the resolvers' states to include the new server's address.
|
||||||
func (s *ServerResolverBuilder) AddServer(server *metadata.Server) {
|
func (s *ServerResolverBuilder) AddServer(areaID types.AreaID, server *metadata.Server) {
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
s.servers[uniqueID(server)] = server
|
areaServers, ok := s.servers[areaID]
|
||||||
|
if !ok {
|
||||||
|
areaServers = make(map[string]*metadata.Server)
|
||||||
|
s.servers[areaID] = areaServers
|
||||||
|
}
|
||||||
|
|
||||||
|
areaServers[uniqueID(server)] = server
|
||||||
|
|
||||||
addrs := s.getDCAddrs(server.Datacenter)
|
addrs := s.getDCAddrs(server.Datacenter)
|
||||||
for _, resolver := range s.resolvers {
|
for _, resolver := range s.resolvers {
|
||||||
|
@ -168,11 +177,19 @@ func DCPrefix(datacenter, suffix string) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoveServer updates the resolvers' states with the given server removed.
|
// RemoveServer updates the resolvers' states with the given server removed.
|
||||||
func (s *ServerResolverBuilder) RemoveServer(server *metadata.Server) {
|
func (s *ServerResolverBuilder) RemoveServer(areaID types.AreaID, server *metadata.Server) {
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
delete(s.servers, uniqueID(server))
|
areaServers, ok := s.servers[areaID]
|
||||||
|
if !ok {
|
||||||
|
return // already gone
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(areaServers, uniqueID(server))
|
||||||
|
if len(areaServers) == 0 {
|
||||||
|
delete(s.servers, areaID)
|
||||||
|
}
|
||||||
|
|
||||||
addrs := s.getDCAddrs(server.Datacenter)
|
addrs := s.getDCAddrs(server.Datacenter)
|
||||||
for _, resolver := range s.resolvers {
|
for _, resolver := range s.resolvers {
|
||||||
|
@ -185,18 +202,29 @@ func (s *ServerResolverBuilder) RemoveServer(server *metadata.Server) {
|
||||||
// getDCAddrs returns a list of the server addresses for the given datacenter.
|
// getDCAddrs returns a list of the server addresses for the given datacenter.
|
||||||
// This method requires that lock is held for reads.
|
// This method requires that lock is held for reads.
|
||||||
func (s *ServerResolverBuilder) getDCAddrs(dc string) []resolver.Address {
|
func (s *ServerResolverBuilder) getDCAddrs(dc string) []resolver.Address {
|
||||||
var addrs []resolver.Address
|
var (
|
||||||
for _, server := range s.servers {
|
addrs []resolver.Address
|
||||||
if server.Datacenter != dc {
|
keptServerIDs = make(map[string]struct{})
|
||||||
continue
|
)
|
||||||
}
|
for _, areaServers := range s.servers {
|
||||||
|
for _, server := range areaServers {
|
||||||
|
if server.Datacenter != dc {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
addrs = append(addrs, resolver.Address{
|
// Servers may be part of multiple areas, so only include each one once.
|
||||||
// NOTE: the address persisted here is only dialable using our custom dialer
|
if _, ok := keptServerIDs[server.ID]; ok {
|
||||||
Addr: DCPrefix(server.Datacenter, server.Addr.String()),
|
continue
|
||||||
Type: resolver.Backend,
|
}
|
||||||
ServerName: server.Name,
|
keptServerIDs[server.ID] = struct{}{}
|
||||||
})
|
|
||||||
|
addrs = append(addrs, resolver.Address{
|
||||||
|
// NOTE: the address persisted here is only dialable using our custom dialer
|
||||||
|
Addr: DCPrefix(server.Datacenter, server.Addr.String()),
|
||||||
|
Type: resolver.Backend,
|
||||||
|
ServerName: server.Name,
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return addrs
|
return addrs
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,13 +1,16 @@
|
||||||
package router
|
package router
|
||||||
|
|
||||||
import "github.com/hashicorp/consul/agent/metadata"
|
import (
|
||||||
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
|
"github.com/hashicorp/consul/types"
|
||||||
|
)
|
||||||
|
|
||||||
// ServerTracker is called when Router is notified of a server being added or
|
// ServerTracker is called when Router is notified of a server being added or
|
||||||
// removed.
|
// removed.
|
||||||
type ServerTracker interface {
|
type ServerTracker interface {
|
||||||
NewRebalancer(dc string) func()
|
NewRebalancer(dc string) func()
|
||||||
AddServer(*metadata.Server)
|
AddServer(types.AreaID, *metadata.Server)
|
||||||
RemoveServer(*metadata.Server)
|
RemoveServer(types.AreaID, *metadata.Server)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rebalancer is called periodically to re-order the servers so that the load on the
|
// Rebalancer is called periodically to re-order the servers so that the load on the
|
||||||
|
@ -24,7 +27,7 @@ func (NoOpServerTracker) NewRebalancer(string) func() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddServer does nothing
|
// AddServer does nothing
|
||||||
func (NoOpServerTracker) AddServer(*metadata.Server) {}
|
func (NoOpServerTracker) AddServer(types.AreaID, *metadata.Server) {}
|
||||||
|
|
||||||
// RemoveServer does nothing
|
// RemoveServer does nothing
|
||||||
func (NoOpServerTracker) RemoveServer(*metadata.Server) {}
|
func (NoOpServerTracker) RemoveServer(types.AreaID, *metadata.Server) {}
|
||||||
|
|
|
@ -175,7 +175,7 @@ func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := r.addServer(area, parts); err != nil {
|
if err := r.addServer(areaID, area, parts); err != nil {
|
||||||
return fmt.Errorf("failed to add server %q to area %q: %v", m.Name, areaID, err)
|
return fmt.Errorf("failed to add server %q to area %q: %v", m.Name, areaID, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -276,7 +276,7 @@ func (r *Router) maybeInitializeManager(area *areaInfo, dc string) *Manager {
|
||||||
}
|
}
|
||||||
|
|
||||||
// addServer does the work of AddServer once the write lock is held.
|
// addServer does the work of AddServer once the write lock is held.
|
||||||
func (r *Router) addServer(area *areaInfo, s *metadata.Server) error {
|
func (r *Router) addServer(areaID types.AreaID, area *areaInfo, s *metadata.Server) error {
|
||||||
// Make the manager on the fly if this is the first we've seen of it,
|
// Make the manager on the fly if this is the first we've seen of it,
|
||||||
// and add it to the index.
|
// and add it to the index.
|
||||||
manager := r.maybeInitializeManager(area, s.Datacenter)
|
manager := r.maybeInitializeManager(area, s.Datacenter)
|
||||||
|
@ -288,7 +288,7 @@ func (r *Router) addServer(area *areaInfo, s *metadata.Server) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
manager.AddServer(s)
|
manager.AddServer(s)
|
||||||
r.grpcServerTracker.AddServer(s)
|
r.grpcServerTracker.AddServer(areaID, s)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -302,7 +302,7 @@ func (r *Router) AddServer(areaID types.AreaID, s *metadata.Server) error {
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("area ID %q does not exist", areaID)
|
return fmt.Errorf("area ID %q does not exist", areaID)
|
||||||
}
|
}
|
||||||
return r.addServer(area, s)
|
return r.addServer(areaID, area, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoveServer should be called whenever a server is removed from an area. This
|
// RemoveServer should be called whenever a server is removed from an area. This
|
||||||
|
@ -324,7 +324,7 @@ func (r *Router) RemoveServer(areaID types.AreaID, s *metadata.Server) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
info.manager.RemoveServer(s)
|
info.manager.RemoveServer(s)
|
||||||
r.grpcServerTracker.RemoveServer(s)
|
r.grpcServerTracker.RemoveServer(areaID, s)
|
||||||
|
|
||||||
// If this manager is empty then remove it so we don't accumulate cruft
|
// If this manager is empty then remove it so we don't accumulate cruft
|
||||||
// and waste time during request routing.
|
// and waste time during request routing.
|
||||||
|
|
Loading…
Reference in New Issue