diff --git a/agent/agent.go b/agent/agent.go index f7c2dff816..e429d9f86a 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -50,6 +50,7 @@ import ( "github.com/hashicorp/consul/lib/file" "github.com/hashicorp/consul/lib/mutex" "github.com/hashicorp/consul/logging" + "github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" ) @@ -373,15 +374,21 @@ func New(bd BaseDeps) (*Agent, error) { cache: bd.Cache, } - cacheName := cachetype.HealthServicesName - if bd.RuntimeConfig.UseStreamingBackend { - cacheName = cachetype.StreamingHealthServicesName + // TODO: create rpcClientHealth in BaseDeps once NetRPC is available without Agent + conn, err := bd.GRPCConnPool.ClientConn(bd.RuntimeConfig.Datacenter) + if err != nil { + return nil, err } + a.rpcClientHealth = &health.Client{ - Cache: bd.Cache, - NetRPC: &a, - CacheName: cacheName, - CacheNameNotStreaming: cachetype.HealthServicesName, + Cache: bd.Cache, + NetRPC: &a, + CacheName: cachetype.HealthServicesName, + ViewStore: bd.ViewStore, + MaterializerDeps: health.MaterializerDeps{ + Client: pbsubscribe.NewStateChangeSubscriptionClient(conn), + Logger: bd.Logger.Named("rpcclient.health"), + }, } a.serviceManager = NewServiceManager(&a) @@ -533,6 +540,8 @@ func (a *Agent) Start(ctx context.Context) error { return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLDefaultPolicy) } + go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh}) + // Start the proxy config manager. a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{ Cache: a.cache, diff --git a/agent/agent_test.go b/agent/agent_test.go index 83d47d25d3..f01e739442 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" "golang.org/x/time/rate" + "google.golang.org/grpc" "gopkg.in/square/go-jose.v2/jwt" "github.com/hashicorp/consul/agent/cache" @@ -307,6 +308,7 @@ func TestAgent_HTTPMaxHeaderBytes(t *testing.T) { Logger: hclog.NewInterceptLogger(nil), Tokens: new(token.Store), TLSConfigurator: tlsConf, + GRPCConnPool: &fakeGRPCConnPool{}, }, RuntimeConfig: &config.RuntimeConfig{ HTTPAddrs: []net.Addr{ @@ -355,6 +357,12 @@ func TestAgent_HTTPMaxHeaderBytes(t *testing.T) { } } +type fakeGRPCConnPool struct{} + +func (f fakeGRPCConnPool) ClientConn(_ string) (*grpc.ClientConn, error) { + return nil, nil +} + func TestAgent_ReconnectConfigWanDisabled(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") @@ -5173,6 +5181,7 @@ func TestAgent_ListenHTTP_MultipleAddresses(t *testing.T) { Logger: hclog.NewInterceptLogger(nil), Tokens: new(token.Store), TLSConfigurator: tlsConf, + GRPCConnPool: &fakeGRPCConnPool{}, }, RuntimeConfig: &config.RuntimeConfig{ HTTPAddrs: []net.Addr{ diff --git a/agent/consul/options.go b/agent/consul/options.go index 12507cb855..9c75a7b339 100644 --- a/agent/consul/options.go +++ b/agent/consul/options.go @@ -1,12 +1,13 @@ package consul import ( - "github.com/hashicorp/consul/agent/grpc" + "github.com/hashicorp/go-hclog" + "google.golang.org/grpc" + "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/tlsutil" - "github.com/hashicorp/go-hclog" ) type Deps struct { @@ -15,5 +16,9 @@ type Deps struct { Tokens *token.Store Router *router.Router ConnPool *pool.ConnPool - GRPCConnPool *grpc.ClientConnPool + GRPCConnPool GRPCClientConner +} + +type GRPCClientConner interface { + ClientConn(datacenter string) (*grpc.ClientConn, error) } diff --git a/agent/consul/subscribe_backend.go b/agent/consul/subscribe_backend.go index d1888911d8..851b14aa07 100644 --- a/agent/consul/subscribe_backend.go +++ b/agent/consul/subscribe_backend.go @@ -5,14 +5,13 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/stream" - agentgrpc "github.com/hashicorp/consul/agent/grpc" "github.com/hashicorp/consul/agent/rpc/subscribe" "github.com/hashicorp/consul/agent/structs" ) type subscribeBackend struct { srv *Server - connPool *agentgrpc.ClientConnPool + connPool GRPCClientConner } // TODO: refactor Resolve methods to an ACLBackend that can be used by all