diff --git a/agent/agent.go b/agent/agent.go index 65d842fd43..0f2fe11db7 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -536,6 +536,10 @@ func (a *Agent) Start(ctx context.Context) error { } // Start the proxy config manager. + cacheName := cachetype.HealthServicesName + if a.config.UseStreamingBackend { + cacheName = cachetype.StreamingHealthServicesName + } a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{ Cache: a.cache, Logger: a.logger.Named(logging.ProxyConfig), @@ -549,8 +553,9 @@ func (a *Agent) Start(ctx context.Context) error { Domain: a.config.DNSDomain, AltDomain: a.config.DNSAltDomain, }, - TLSConfigurator: a.tlsConfigurator, - IntentionDefaultAllow: intentionDefaultAllow, + TLSConfigurator: a.tlsConfigurator, + IntentionDefaultAllow: intentionDefaultAllow, + ServiceHealthCacheName: cacheName, }) if err != nil { return err diff --git a/agent/proxycfg/manager.go b/agent/proxycfg/manager.go index c703da4f06..bcd7f6df44 100644 --- a/agent/proxycfg/manager.go +++ b/agent/proxycfg/manager.go @@ -4,11 +4,12 @@ import ( "errors" "sync" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/tlsutil" - "github.com/hashicorp/go-hclog" ) var ( @@ -71,6 +72,9 @@ type ManagerConfig struct { Logger hclog.Logger TLSConfigurator *tlsutil.Configurator + // TODO: replace this field with a type that exposes Notify + ServiceHealthCacheName string + // IntentionDefaultAllow is set by the agent so that we can pass this // information to proxies that need to make intention decisions on their // own. @@ -187,7 +191,7 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, token string } var err error - state, err = newState(ns, token) + state, err = newState(ns, token, m.ManagerConfig.ServiceHealthCacheName) if err != nil { return err } diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index 01b896d293..0c30f2cf58 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -342,7 +342,13 @@ func testManager_BasicLifecycle( state.TriggerSyncChanges = func() {} // Create manager - m, err := NewManager(ManagerConfig{c, state, source, DNSConfig{}, logger, nil, false}) + m, err := NewManager(ManagerConfig{ + Cache: c, + State: state, + Source: source, + Logger: logger, + ServiceHealthCacheName: cachetype.HealthServicesName, + }) require.NoError(err) // And run it diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 59d11a2f11..27663ca08b 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -9,13 +9,14 @@ import ( "strings" "time" + "github.com/hashicorp/go-hclog" + "github.com/mitchellh/copystructure" + "github.com/mitchellh/mapstructure" + "github.com/hashicorp/consul/agent/cache" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/logging" - "github.com/hashicorp/go-hclog" - "github.com/mitchellh/copystructure" - "github.com/mitchellh/mapstructure" ) type CacheNotifier interface { @@ -72,6 +73,9 @@ type state struct { proxyCfg structs.ConnectProxyConfig token string + // TODO: replace this field with a type that exposes Notify + serviceHealthCacheName string + ch chan cache.UpdateEvent snapCh chan ConfigSnapshot reqCh chan chan *ConfigSnapshot @@ -120,7 +124,7 @@ func copyProxyConfig(ns *structs.NodeService) (structs.ConnectProxyConfig, error // // The returned state needs its required dependencies to be set before Watch // can be called. -func newState(ns *structs.NodeService, token string) (*state, error) { +func newState(ns *structs.NodeService, token string, serviceHealthCacheName string) (*state, error) { switch ns.Kind { case structs.ServiceKindConnectProxy: case structs.ServiceKindTerminatingGateway: @@ -155,6 +159,8 @@ func newState(ns *structs.NodeService, token string) (*state, error) { taggedAddresses: taggedAddresses, proxyCfg: proxyCfg, token: token, + + serviceHealthCacheName: serviceHealthCacheName, // 10 is fairly arbitrary here but allow for the 3 mandatory and a // reasonable number of upstream watches to all deliver their initial // messages in parallel without blocking the cache.Notify loops. It's not a @@ -225,7 +231,7 @@ func (s *state) watchConnectProxyService(ctx context.Context, correlationId stri var finalMeta structs.EnterpriseMeta finalMeta.Merge(entMeta) - return s.cache.Notify(ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{ + return s.cache.Notify(ctx, s.serviceHealthCacheName, &structs.ServiceSpecificRequest{ Datacenter: dc, QueryOptions: structs.QueryOptions{ Token: s.token, @@ -443,7 +449,7 @@ func (s *state) initWatchesMeshGateway() error { return err } - err = s.cache.Notify(s.ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{ + err = s.cache.Notify(s.ctx, s.serviceHealthCacheName, &structs.ServiceSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, ServiceName: structs.ConsulServiceName, @@ -969,7 +975,7 @@ func (s *state) handleUpdateTerminatingGateway(u cache.UpdateEvent, snap *Config // Watch the health endpoint to discover endpoints for the service if _, ok := snap.TerminatingGateway.WatchedServices[svc.Service]; !ok { ctx, cancel := context.WithCancel(s.ctx) - err := s.cache.Notify(ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{ + err := s.cache.Notify(ctx, s.serviceHealthCacheName, &structs.ServiceSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, ServiceName: svc.Service.Name, @@ -1267,7 +1273,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho if _, ok := snap.MeshGateway.WatchedServices[svc]; !ok { ctx, cancel := context.WithCancel(s.ctx) - err := s.cache.Notify(ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{ + err := s.cache.Notify(ctx, s.serviceHealthCacheName, &structs.ServiceSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, ServiceName: svc.Name, diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index 19a0ed22b4..3a58ac0747 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -6,12 +6,13 @@ import ( "sync" "testing" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/agent/cache" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/sdk/testutil" - "github.com/stretchr/testify/require" ) func TestStateChanged(t *testing.T) { @@ -111,7 +112,7 @@ func TestStateChanged(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { require := require.New(t) - state, err := newState(tt.ns, tt.token) + state, err := newState(tt.ns, tt.token, cachetype.HealthServicesName) require.NoError(err) otherNS, otherToken := tt.mutate(*tt.ns, tt.token) require.Equal(tt.want, state.Changed(otherNS, otherToken)) @@ -1509,7 +1510,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { for name, tc := range cases { t.Run(name, func(t *testing.T) { - state, err := newState(&tc.ns, "") + state, err := newState(&tc.ns, "", cachetype.HealthServicesName) // verify building the initial state worked require.NoError(t, err)