diff --git a/agent/consul/subscribe_backend_test.go b/agent/consul/subscribe_backend_test.go index e7debf0ab5..5b412574e6 100644 --- a/agent/consul/subscribe_backend_test.go +++ b/agent/consul/subscribe_backend_test.go @@ -12,7 +12,6 @@ import ( "github.com/stretchr/testify/require" gogrpc "google.golang.org/grpc" - grpcresolver "google.golang.org/grpc/resolver" grpc "github.com/hashicorp/consul/agent/grpc" "github.com/hashicorp/consul/agent/grpc/resolver" @@ -338,8 +337,11 @@ func TestSubscribeBackend_IntegrationWithServer_DeliversAllMessages(t *testing.T } func newClientWithGRPCResolver(t *testing.T, ops ...func(*Config)) (*Client, *resolver.ServerResolverBuilder) { - builder := resolver.NewServerResolverBuilder(resolver.Config{Scheme: t.Name()}) - registerWithGRPC(builder) + builder := resolver.NewServerResolverBuilder(resolver.Config{Authority: t.Name()}) + resolver.Register(builder) + t.Cleanup(func() { + resolver.Deregister(builder.Authority()) + }) _, config := testClientConfig(t) for _, op := range ops { @@ -361,19 +363,6 @@ func newClientWithGRPCResolver(t *testing.T, ops ...func(*Config)) (*Client, *re return client, builder } -var grpcRegisterLock sync.Mutex - -// registerWithGRPC registers the grpc/resolver.Builder as a grpc/resolver. -// This function exists to synchronize registrations with a lock. -// grpc/resolver.Register expects all registration to happen at init and does -// not allow for concurrent registration. This function exists to support -// parallel testing. -func registerWithGRPC(b grpcresolver.Builder) { - grpcRegisterLock.Lock() - defer grpcRegisterLock.Unlock() - grpcresolver.Register(b) -} - type testLogger interface { Logf(format string, args ...interface{}) } diff --git a/agent/grpc/client.go b/agent/grpc/client.go index 8e43873828..e58d5c6496 100644 --- a/agent/grpc/client.go +++ b/agent/grpc/client.go @@ -25,10 +25,10 @@ type ClientConnPool struct { type ServerLocator interface { // ServerForAddr is used to look up server metadata from an address. ServerForAddr(addr string) (*metadata.Server, error) - // Scheme returns the url scheme to use to dial the server. This is primarily + // Authority returns the target authority to use to dial the server. This is primarily // needed for testing multiple agents in parallel, because gRPC requires the // resolver to be registered globally. - Scheme() string + Authority() string } // TLSWrapper wraps a non-TLS connection and returns a connection with TLS @@ -58,7 +58,7 @@ func (c *ClientConnPool) ClientConn(datacenter string) (*grpc.ClientConn, error) } conn, err := grpc.Dial( - fmt.Sprintf("%s:///server.%s", c.servers.Scheme(), datacenter), + fmt.Sprintf("consul://%s/server.%s", c.servers.Authority(), datacenter), // use WithInsecure mode here because we handle the TLS wrapping in the // custom dialer based on logic around whether the server has TLS enabled. grpc.WithInsecure(), diff --git a/agent/grpc/client_test.go b/agent/grpc/client_test.go index 5028e34fa9..49922a3098 100644 --- a/agent/grpc/client_test.go +++ b/agent/grpc/client_test.go @@ -117,7 +117,7 @@ func newConfig(t *testing.T) resolver.Config { n := t.Name() s := strings.Replace(n, "/", "", -1) s = strings.Replace(s, "_", "", -1) - return resolver.Config{Scheme: strings.ToLower(s)} + return resolver.Config{Authority: strings.ToLower(s)} } func TestClientConnPool_IntegrationWithGRPCResolver_Rebalance(t *testing.T) { @@ -195,3 +195,10 @@ func TestClientConnPool_IntegrationWithGRPCResolver_MultiDC(t *testing.T) { require.Equal(t, resp.Datacenter, dc) } } + +func registerWithGRPC(t *testing.T, b *resolver.ServerResolverBuilder) { + resolver.Register(b) + t.Cleanup(func() { + resolver.Deregister(b.Authority()) + }) +} diff --git a/agent/grpc/resolver/registry.go b/agent/grpc/resolver/registry.go new file mode 100644 index 0000000000..d305b607d7 --- /dev/null +++ b/agent/grpc/resolver/registry.go @@ -0,0 +1,54 @@ +package resolver + +import ( + "fmt" + "sync" + + "google.golang.org/grpc/resolver" +) + +// registry of ServerResolverBuilder. This type exists because grpc requires that +// resolvers are registered globally before any requests are made. This is +// incompatible with our resolver implementation and testing strategy, which +// requires a different Resolver for each test. +type registry struct { + lock sync.RWMutex + byAuthority map[string]*ServerResolverBuilder +} + +func (r *registry) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) { + r.lock.RLock() + defer r.lock.RUnlock() + res, ok := r.byAuthority[target.Authority] + if !ok { + return nil, fmt.Errorf("no resolver registered for %v", target.Authority) + } + return res.Build(target, cc, opts) +} + +func (r *registry) Scheme() string { + return "consul" +} + +var _ resolver.Builder = (*registry)(nil) + +var reg = ®istry{byAuthority: make(map[string]*ServerResolverBuilder)} + +func init() { + resolver.Register(reg) +} + +// Register a ServerResolverBuilder with the global registry. +func Register(res *ServerResolverBuilder) { + reg.lock.Lock() + defer reg.lock.Unlock() + reg.byAuthority[res.Authority()] = res +} + +// Deregister the ServerResolverBuilder associated with the authority. Only used +// for testing. +func Deregister(authority string) { + reg.lock.Lock() + defer reg.lock.Unlock() + delete(reg.byAuthority, authority) +} diff --git a/agent/grpc/resolver/resolver.go b/agent/grpc/resolver/resolver.go index 76a2188d2f..b3eae815ff 100644 --- a/agent/grpc/resolver/resolver.go +++ b/agent/grpc/resolver/resolver.go @@ -15,9 +15,7 @@ import ( // ServerResolverBuilder tracks the current server list and keeps any // ServerResolvers updated when changes occur. type ServerResolverBuilder struct { - // scheme used to query the server. Defaults to consul. Used to support - // parallel testing because gRPC registers resolvers globally. - scheme string + cfg Config // servers is an index of Servers by Server.ID. The map contains server IDs // for all datacenters. servers map[string]*metadata.Server @@ -28,25 +26,22 @@ type ServerResolverBuilder struct { lock sync.RWMutex } -var _ resolver.Builder = (*ServerResolverBuilder)(nil) - type Config struct { - // Scheme used to connect to the server. Defaults to consul. - Scheme string + // Authority used to query the server. Defaults to "". Used to support + // parallel testing because gRPC registers resolvers globally. + Authority string } func NewServerResolverBuilder(cfg Config) *ServerResolverBuilder { - if cfg.Scheme == "" { - cfg.Scheme = "consul" - } return &ServerResolverBuilder{ - scheme: cfg.Scheme, + cfg: cfg, servers: make(map[string]*metadata.Server), resolvers: make(map[resolver.ClientConn]*serverResolver), } } -// Rebalance shuffles the server list for resolvers in all datacenters. +// NewRebalancer returns a function which shuffles the server list for resolvers +// in all datacenters. func (s *ServerResolverBuilder) NewRebalancer(dc string) func() { shuffler := rand.New(rand.NewSource(time.Now().UnixNano())) return func() { @@ -112,7 +107,9 @@ func (s *ServerResolverBuilder) Build(target resolver.Target, cc resolver.Client return resolver, nil } -func (s *ServerResolverBuilder) Scheme() string { return s.scheme } +func (s *ServerResolverBuilder) Authority() string { + return s.cfg.Authority +} // AddServer updates the resolvers' states to include the new server's address. func (s *ServerResolverBuilder) AddServer(server *metadata.Server) { diff --git a/agent/grpc/server_test.go b/agent/grpc/server_test.go index b660a66a73..442b617d50 100644 --- a/agent/grpc/server_test.go +++ b/agent/grpc/server_test.go @@ -13,7 +13,6 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" "google.golang.org/grpc" - "google.golang.org/grpc/resolver" "github.com/hashicorp/consul/agent/grpc/internal/testservice" "github.com/hashicorp/consul/agent/metadata" @@ -167,10 +166,3 @@ func (f *fakeRPCListener) handleConn(conn net.Conn) { conn.Close() } } - -func registerWithGRPC(t *testing.T, b resolver.Builder) { - resolver.Register(b) - t.Cleanup(func() { - resolver.UnregisterForTesting(b.Scheme()) - }) -} diff --git a/agent/setup.go b/agent/setup.go index 93fd9e6cc5..bfa4abfade 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -11,7 +11,6 @@ import ( "github.com/armon/go-metrics/prometheus" "github.com/hashicorp/go-hclog" "google.golang.org/grpc/grpclog" - grpcresolver "google.golang.org/grpc/resolver" autoconf "github.com/hashicorp/consul/agent/auto-config" "github.com/hashicorp/consul/agent/cache" @@ -105,7 +104,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error) d.ConnPool = newConnPool(cfg, d.Logger, d.TLSConfigurator) builder := resolver.NewServerResolverBuilder(resolver.Config{}) - registerWithGRPC(builder) + resolver.Register(builder) d.GRPCConnPool = grpc.NewClientConnPool(builder, grpc.TLSWrapper(d.TLSConfigurator.OutgoingRPCWrapper()), d.TLSConfigurator.UseTLS) d.Router = router.NewRouter(d.Logger, cfg.Datacenter, fmt.Sprintf("%s.%s", cfg.NodeName, cfg.Datacenter), builder) @@ -169,19 +168,6 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil return pool } -var registerLock sync.Mutex - -// registerWithGRPC registers the grpc/resolver.Builder as a grpc/resolver. -// This function exists to synchronize registrations with a lock. -// grpc/resolver.Register expects all registration to happen at init and does -// not allow for concurrent registration. This function exists to support -// parallel testing. -func registerWithGRPC(b grpcresolver.Builder) { - registerLock.Lock() - defer registerLock.Unlock() - grpcresolver.Register(b) -} - // getPrometheusDefs reaches into every slice of prometheus defs we've defined in each part of the agent, and appends // all of our slices into one nice slice of definitions per metric type for the Consul agent to pass to go-metrics. func getPrometheusDefs(cfg lib.TelemetryConfig) ([]prometheus.GaugeDefinition, []prometheus.CounterDefinition, []prometheus.SummaryDefinition) { diff --git a/agent/submatview/store.go b/agent/submatview/store.go index cf99857089..80e9f30b7d 100644 --- a/agent/submatview/store.go +++ b/agent/submatview/store.go @@ -30,7 +30,7 @@ type Store struct { // idleTTL is the duration of time an entry should remain in the Store after the // last request for that entry has been terminated. It is a field on the struct - // so that it can be patched in tests without need a lock. + // so that it can be patched in tests without needing a global lock. idleTTL time.Duration } @@ -122,8 +122,8 @@ func (s *Store) Get(ctx context.Context, req Request) (Result, error) { defer cancel() result, err := materializer.getFromView(ctx, info.MinIndex) - // context.DeadlineExceeded is translated to nil to match the behaviour of - // agent/cache.Cache.Get. + // context.DeadlineExceeded is translated to nil to match the timeout + // behaviour of agent/cache.Cache.Get. if err == nil || errors.Is(err, context.DeadlineExceeded) { return result, nil }