diff --git a/agent/agent.go b/agent/agent.go index 578563bd8d..6b0911ba5b 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -20,7 +20,6 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" - "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/go-connlimit" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" @@ -41,6 +40,7 @@ import ( publicgrpc "github.com/hashicorp/consul/agent/grpc/public" "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/proxycfg" + proxycfgglue "github.com/hashicorp/consul/agent/proxycfg-glue" catalogproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/catalog" localproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/local" "github.com/hashicorp/consul/agent/rpcclient/health" @@ -56,6 +56,7 @@ import ( "github.com/hashicorp/consul/lib/mutex" "github.com/hashicorp/consul/lib/routine" "github.com/hashicorp/consul/logging" + "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" ) @@ -627,10 +628,28 @@ func (a *Agent) Start(ctx context.Context) error { go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh}) // Start the proxy config manager. + proxyDataSources := proxycfg.DataSources{ + CARoots: proxycfgglue.CacheCARoots(a.cache), + CompiledDiscoveryChain: proxycfgglue.CacheCompiledDiscoveryChain(a.cache), + ConfigEntry: proxycfgglue.CacheConfigEntry(a.cache), + ConfigEntryList: proxycfgglue.CacheConfigEntryList(a.cache), + Datacenters: proxycfgglue.CacheDatacenters(a.cache), + FederationStateListMeshGateways: proxycfgglue.CacheFederationStateListMeshGateways(a.cache), + GatewayServices: proxycfgglue.CacheGatewayServices(a.cache), + Health: proxycfgglue.Health(a.rpcClientHealth), + HTTPChecks: proxycfgglue.CacheHTTPChecks(a.cache), + Intentions: proxycfgglue.CacheIntentions(a.cache), + IntentionUpstreams: proxycfgglue.CacheIntentionUpstreams(a.cache), + InternalServiceDump: proxycfgglue.CacheInternalServiceDump(a.cache), + LeafCertificate: proxycfgglue.CacheLeafCertificate(a.cache), + PreparedQuery: proxycfgglue.CachePrepraredQuery(a.cache), + ResolvedServiceConfig: proxycfgglue.CacheResolvedServiceConfig(a.cache), + ServiceList: proxycfgglue.CacheServiceList(a.cache), + } + a.fillEnterpriseProxyDataSources(&proxyDataSources) a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{ - Cache: &proxycfg.CacheWrapper{Cache: a.cache}, - Health: &proxycfg.HealthWrapper{Health: a.rpcClientHealth}, - Logger: a.logger.Named(logging.ProxyConfig), + DataSources: proxyDataSources, + Logger: a.logger.Named(logging.ProxyConfig), Source: &structs.QuerySource{ Datacenter: a.config.Datacenter, Segment: a.config.SegmentName, diff --git a/agent/agent_oss.go b/agent/agent_oss.go index 43de920a51..39adbef81e 100644 --- a/agent/agent_oss.go +++ b/agent/agent_oss.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" + "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" ) @@ -58,3 +59,5 @@ func (a *Agent) AgentEnterpriseMeta() *acl.EnterpriseMeta { } func (a *Agent) registerEntCache() {} + +func (*Agent) fillEnterpriseProxyDataSources(*proxycfg.DataSources) {} diff --git a/agent/proxycfg-glue/glue.go b/agent/proxycfg-glue/glue.go new file mode 100644 index 0000000000..7dd7a8846a --- /dev/null +++ b/agent/proxycfg-glue/glue.go @@ -0,0 +1,153 @@ +package proxycfgglue + +import ( + "context" + + "github.com/hashicorp/consul/agent/cache" + cachetype "github.com/hashicorp/consul/agent/cache-types" + "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/rpcclient/health" + "github.com/hashicorp/consul/agent/structs" +) + +// CacheCARoots satisfies the proxycfg.CARoots interface by sourcing data from +// the agent cache. +func CacheCARoots(c *cache.Cache) proxycfg.CARoots { + return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.ConnectCARootName} +} + +// CacheCompiledDiscoveryChain satisfies the proxycfg.CompiledDiscoveryChain +// interface by sourcing data from the agent cache. +func CacheCompiledDiscoveryChain(c *cache.Cache) proxycfg.CompiledDiscoveryChain { + return &cacheProxyDataSource[*structs.DiscoveryChainRequest]{c, cachetype.CompiledDiscoveryChainName} +} + +// CacheConfigEntry satisfies the proxycfg.ConfigEntry interface by sourcing +// data from the agent cache. +func CacheConfigEntry(c *cache.Cache) proxycfg.ConfigEntry { + return &cacheProxyDataSource[*structs.ConfigEntryQuery]{c, cachetype.ConfigEntryName} +} + +// CacheConfigEntryList satisfies the proxycfg.ConfigEntryList interface by +// sourcing data from the agent cache. +func CacheConfigEntryList(c *cache.Cache) proxycfg.ConfigEntryList { + return &cacheProxyDataSource[*structs.ConfigEntryQuery]{c, cachetype.ConfigEntryListName} +} + +// CacheDatacenters satisfies the proxycfg.Datacenters interface by sourcing +// data from the agent cache. +func CacheDatacenters(c *cache.Cache) proxycfg.Datacenters { + return &cacheProxyDataSource[*structs.DatacentersRequest]{c, cachetype.CatalogDatacentersName} +} + +// CacheFederationStateListMeshGateways satisfies the proxycfg.FederationStateListMeshGateways +// interface by sourcing data from the agent cache. +func CacheFederationStateListMeshGateways(c *cache.Cache) proxycfg.FederationStateListMeshGateways { + return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.FederationStateListMeshGatewaysName} +} + +// CacheGatewayServices satisfies the proxycfg.GatewayServices interface by +// sourcing data from the agent cache. +func CacheGatewayServices(c *cache.Cache) proxycfg.GatewayServices { + return &cacheProxyDataSource[*structs.ServiceSpecificRequest]{c, cachetype.GatewayServicesName} +} + +// CacheHTTPChecks satisifies the proxycfg.HTTPChecks interface by sourcing +// data from the agent cache. +func CacheHTTPChecks(c *cache.Cache) proxycfg.HTTPChecks { + return &cacheProxyDataSource[*cachetype.ServiceHTTPChecksRequest]{c, cachetype.ServiceHTTPChecksName} +} + +// CacheIntentions satisfies the proxycfg.Intentions interface by sourcing data +// from the agent cache. +func CacheIntentions(c *cache.Cache) proxycfg.Intentions { + return &cacheProxyDataSource[*structs.IntentionQueryRequest]{c, cachetype.IntentionMatchName} +} + +// CacheIntentionUpstreams satisfies the proxycfg.IntentionUpstreams interface +// by sourcing data from the agent cache. +func CacheIntentionUpstreams(c *cache.Cache) proxycfg.IntentionUpstreams { + return &cacheProxyDataSource[*structs.ServiceSpecificRequest]{c, cachetype.IntentionUpstreamsName} +} + +// CacheInternalServiceDump satisfies the proxycfg.InternalServiceDump +// interface by sourcing data from the agent cache. +func CacheInternalServiceDump(c *cache.Cache) proxycfg.InternalServiceDump { + return &cacheProxyDataSource[*structs.ServiceDumpRequest]{c, cachetype.InternalServiceDumpName} +} + +// CacheLeafCertificate satisifies the proxycfg.LeafCertificate interface by +// sourcing data from the agent cache. +func CacheLeafCertificate(c *cache.Cache) proxycfg.LeafCertificate { + return &cacheProxyDataSource[*cachetype.ConnectCALeafRequest]{c, cachetype.ConnectCALeafName} +} + +// CachePrepraredQuery satisfies the proxycfg.PreparedQuery interface by +// sourcing data from the agent cache. +func CachePrepraredQuery(c *cache.Cache) proxycfg.PreparedQuery { + return &cacheProxyDataSource[*structs.PreparedQueryExecuteRequest]{c, cachetype.PreparedQueryName} +} + +// CacheResolvedServiceConfig satisfies the proxycfg.ResolvedServiceConfig +// interface by sourcing data from the agent cache. +func CacheResolvedServiceConfig(c *cache.Cache) proxycfg.ResolvedServiceConfig { + return &cacheProxyDataSource[*structs.ServiceConfigRequest]{c, cachetype.ResolvedServiceConfigName} +} + +// CacheServiceList satisfies the proxycfg.ServiceList interface by sourcing +// data from the agent cache. +func CacheServiceList(c *cache.Cache) proxycfg.ServiceList { + return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.CatalogServiceListName} +} + +// cacheProxyDataSource implements a generic wrapper around the agent cache to +// provide data to the proxycfg.Manager. +type cacheProxyDataSource[ReqType cache.Request] struct { + c *cache.Cache + t string +} + +// Notify satisfies the interfaces used by proxycfg.Manager to source data by +// subscribing to notifications from the agent cache. +func (c *cacheProxyDataSource[ReqType]) Notify( + ctx context.Context, + req ReqType, + correlationID string, + ch chan<- proxycfg.UpdateEvent, +) error { + return c.c.NotifyCallback(ctx, c.t, req, correlationID, dispatchCacheUpdate(ctx, ch)) +} + +// Health wraps health.Client so that the proxycfg package doesn't need to +// reference cache.UpdateEvent directly. +func Health(client *health.Client) proxycfg.Health { + return &healthWrapper{client} +} + +type healthWrapper struct { + client *health.Client +} + +func (h *healthWrapper) Notify( + ctx context.Context, + req *structs.ServiceSpecificRequest, + correlationID string, + ch chan<- proxycfg.UpdateEvent, +) error { + return h.client.Notify(ctx, *req, correlationID, dispatchCacheUpdate(ctx, ch)) +} + +func dispatchCacheUpdate(ctx context.Context, ch chan<- proxycfg.UpdateEvent) cache.Callback { + return func(ctx context.Context, e cache.UpdateEvent) { + u := proxycfg.UpdateEvent{ + CorrelationID: e.CorrelationID, + Result: e.Result, + Err: e.Err, + } + + select { + case ch <- u: + case <-ctx.Done(): + } + } +} diff --git a/agent/proxycfg/connect_proxy.go b/agent/proxycfg/connect_proxy.go index f5d469d2e5..8138a0ac00 100644 --- a/agent/proxycfg/connect_proxy.go +++ b/agent/proxycfg/connect_proxy.go @@ -32,7 +32,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e snap.ConnectProxy.PassthroughIndices = make(map[string]indexedTarget) // Watch for root changes - err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{ + err := s.dataSources.CARoots.Notify(ctx, &structs.DCSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, Source: *s.source, @@ -42,7 +42,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e } // Watch the leaf cert - err = s.cache.Notify(ctx, cachetype.ConnectCALeafName, &cachetype.ConnectCALeafRequest{ + err = s.dataSources.LeafCertificate.Notify(ctx, &cachetype.ConnectCALeafRequest{ Datacenter: s.source.Datacenter, Token: s.token, Service: s.proxyCfg.DestinationServiceName, @@ -53,7 +53,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e } // Watch for intention updates - err = s.cache.Notify(ctx, cachetype.IntentionMatchName, &structs.IntentionQueryRequest{ + err = s.dataSources.Intentions.Notify(ctx, &structs.IntentionQueryRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, Match: &structs.IntentionQueryMatch{ @@ -72,7 +72,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e } // Get information about the entire service mesh. - err = s.cache.Notify(ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{ + err = s.dataSources.ConfigEntry.Notify(ctx, &structs.ConfigEntryQuery{ Kind: structs.MeshConfig, Name: structs.MeshConfigMesh, Datacenter: s.source.Datacenter, @@ -84,7 +84,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e } // Watch for service check updates - err = s.cache.Notify(ctx, cachetype.ServiceHTTPChecksName, &cachetype.ServiceHTTPChecksRequest{ + err = s.dataSources.HTTPChecks.Notify(ctx, &cachetype.ServiceHTTPChecksRequest{ ServiceID: s.proxyCfg.DestinationServiceID, EnterpriseMeta: s.proxyID.EnterpriseMeta, }, svcChecksWatchIDPrefix+structs.ServiceIDString(s.proxyCfg.DestinationServiceID, &s.proxyID.EnterpriseMeta), s.ch) @@ -94,7 +94,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e if s.proxyCfg.Mode == structs.ProxyModeTransparent { // When in transparent proxy we will infer upstreams from intentions with this source - err := s.cache.Notify(ctx, cachetype.IntentionUpstreamsName, &structs.ServiceSpecificRequest{ + err := s.dataSources.IntentionUpstreams.Notify(ctx, &structs.ServiceSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, ServiceName: s.proxyCfg.DestinationServiceName, @@ -156,7 +156,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e switch u.DestinationType { case structs.UpstreamDestTypePreparedQuery: - err = s.cache.Notify(ctx, cachetype.PreparedQueryName, &structs.PreparedQueryExecuteRequest{ + err = s.dataSources.PreparedQuery.Notify(ctx, &structs.PreparedQueryExecuteRequest{ Datacenter: dc, QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: defaultPreparedQueryPollInterval}, QueryIDOrName: u.DestinationName, @@ -196,7 +196,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e continue } - err = s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{ + err = s.dataSources.CompiledDiscoveryChain.Notify(ctx, &structs.DiscoveryChainRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, Name: u.DestinationName, diff --git a/agent/proxycfg/data_sources.go b/agent/proxycfg/data_sources.go new file mode 100644 index 0000000000..694c3cbf10 --- /dev/null +++ b/agent/proxycfg/data_sources.go @@ -0,0 +1,162 @@ +package proxycfg + +import ( + "context" + + cachetype "github.com/hashicorp/consul/agent/cache-types" + "github.com/hashicorp/consul/agent/structs" +) + +// UpdateEvent contains new data for a resource we are subscribed to (e.g. an +// agent cache entry). +type UpdateEvent struct { + CorrelationID string + Result interface{} + Err error +} + +// DataSources contains the dependencies used to consume data used to configure +// proxies. +type DataSources struct { + // CARoots provides updates about the CA root certificates on a notification + // channel. + CARoots CARoots + // CompiledDiscoveryChain provides updates about a service's discovery chain + // on a notification channel. + CompiledDiscoveryChain CompiledDiscoveryChain + // ConfigEntry provides updates about a single config entry on a notification + // channel. + ConfigEntry ConfigEntry + // ConfigEntryList provides updates about a list of config entries on a + // notification channel. + ConfigEntryList ConfigEntryList + // Datacenters provides updates about federated datacenters on a notification + // channel. + Datacenters Datacenters + // FederationStateListMeshGateways is the interface used to consume updates + // about mesh gateways from the federation state. + FederationStateListMeshGateways FederationStateListMeshGateways + // GatewayServices provides updates about a gateway's upstream services on a + // notification channel. + GatewayServices GatewayServices + // Health provides service health updates on a notification channel. + Health Health + // HTTPChecks provides updates about a service's HTTP and gRPC checks on a + // notification channel. + HTTPChecks HTTPChecks + // Intentions provides intention updates on a notification channel. + Intentions Intentions + // IntentionUpstreams provides intention-inferred upstream updates on a + // notification channel. + IntentionUpstreams IntentionUpstreams + // InternalServiceDump provides updates about a (gateway) service on a + // notification channel. + InternalServiceDump InternalServiceDump + // LeafCertificate provides updates about the service's leaf certificate on a + // notification channel. + LeafCertificate LeafCertificate + // PreparedQuery provides updates about the results of a prepared query. + PreparedQuery PreparedQuery + // ResolvedServiceConfig provides updates about a service's resolved config. + ResolvedServiceConfig ResolvedServiceConfig + // ServiceList provides updates about the list of all services in a datacenter + // on a notification channel. + ServiceList ServiceList + + DataSourcesEnterprise +} + +// CARoots is the interface used to consume updates about the CA root +// certificates. +type CARoots interface { + Notify(ctx context.Context, req *structs.DCSpecificRequest, correlationID string, ch chan<- UpdateEvent) error +} + +// CompiledDiscoveryChain is the interface used to consume updates about the +// compiled discovery chain for a service. +type CompiledDiscoveryChain interface { + Notify(ctx context.Context, req *structs.DiscoveryChainRequest, correlationID string, ch chan<- UpdateEvent) error +} + +// ConfigEntry is the interface used to consume updates about a single config +// entry. +type ConfigEntry interface { + Notify(ctx context.Context, req *structs.ConfigEntryQuery, correlationID string, ch chan<- UpdateEvent) error +} + +// ConfigEntry is the interface used to consume updates about a list of config +// entries. +type ConfigEntryList interface { + Notify(ctx context.Context, req *structs.ConfigEntryQuery, correlationID string, ch chan<- UpdateEvent) error +} + +// Datacenters is the interface used to consume updates about federated +// datacenters. +type Datacenters interface { + Notify(ctx context.Context, req *structs.DatacentersRequest, correlationID string, ch chan<- UpdateEvent) error +} + +// FederationStateListMeshGateways is the interface used to consume updates +// about mesh gateways from the federation state. +type FederationStateListMeshGateways interface { + Notify(ctx context.Context, req *structs.DCSpecificRequest, correlationID string, ch chan<- UpdateEvent) error +} + +// GatewayServices is the interface used to consume updates about a gateway's +// upstream services. +type GatewayServices interface { + Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- UpdateEvent) error +} + +// Health is the interface used to consume service health updates. +type Health interface { + Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- UpdateEvent) error +} + +// HTTPChecks is the interface used to consume updates about a service's HTTP +// and gRPC-based checks (in order to determine which paths to expose through +// the proxy). +type HTTPChecks interface { + Notify(ctx context.Context, req *cachetype.ServiceHTTPChecksRequest, correlationID string, ch chan<- UpdateEvent) error +} + +// Intentions is the interface used to consume intention updates. +type Intentions interface { + Notify(ctx context.Context, req *structs.IntentionQueryRequest, correlationID string, ch chan<- UpdateEvent) error +} + +// IntentionUpstreams is the interface used to consume updates about upstreams +// inferred from service intentions. +type IntentionUpstreams interface { + Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- UpdateEvent) error +} + +// InternalServiceDump is the interface used to consume updates about a (gateway) +// service via the internal ServiceDump RPC. +type InternalServiceDump interface { + Notify(ctx context.Context, req *structs.ServiceDumpRequest, correlationID string, ch chan<- UpdateEvent) error +} + +// LeafCertificate is the interface used to consume updates about a service's +// leaf certificate. +type LeafCertificate interface { + Notify(ctx context.Context, req *cachetype.ConnectCALeafRequest, correlationID string, ch chan<- UpdateEvent) error +} + +// PreparedQuery is the interface used to consume updates about the results of +// a prepared query. +type PreparedQuery interface { + Notify(ctx context.Context, req *structs.PreparedQueryExecuteRequest, correlationID string, ch chan<- UpdateEvent) error +} + +// ResolvedServiceConfig is the interface used to consume updates about a +// service's resolved config. +type ResolvedServiceConfig interface { + Notify(ctx context.Context, req *structs.ServiceConfigRequest, correlationID string, ch chan<- UpdateEvent) error +} + +// ServiceList is the interface used to consume updates about the list of +// all services in a datacenter. +type ServiceList interface { + Notify(ctx context.Context, req *structs.DCSpecificRequest, correlationID string, ch chan<- UpdateEvent) error +} diff --git a/agent/proxycfg/data_sources_oss.go b/agent/proxycfg/data_sources_oss.go new file mode 100644 index 0000000000..e5e23d1773 --- /dev/null +++ b/agent/proxycfg/data_sources_oss.go @@ -0,0 +1,6 @@ +//go:build !consulent +// +build !consulent + +package proxycfg + +type DataSourcesEnterprise struct{} diff --git a/agent/proxycfg/glue.go b/agent/proxycfg/glue.go deleted file mode 100644 index 8a25da2f81..0000000000 --- a/agent/proxycfg/glue.go +++ /dev/null @@ -1,60 +0,0 @@ -// TODO(agentless): these glue types belong in the agent package, but moving -// them is a little tricky because the proxycfg tests use them. It should be -// easier to break apart once we no longer depend on cache.Notify directly. -package proxycfg - -import ( - "context" - - "github.com/hashicorp/consul/agent/cache" - "github.com/hashicorp/consul/agent/rpcclient/health" - "github.com/hashicorp/consul/agent/structs" -) - -// HealthWrapper wraps health.Client so that the rest of the proxycfg package -// doesn't need to reference cache.UpdateEvent (it will be extracted into a -// shared library in the future). -type HealthWrapper struct { - Health *health.Client -} - -func (w *HealthWrapper) Notify( - ctx context.Context, - req structs.ServiceSpecificRequest, - correlationID string, - ch chan<- UpdateEvent, -) error { - return w.Health.Notify(ctx, req, correlationID, dispatchCacheUpdate(ctx, ch)) -} - -// CacheWrapper wraps cache.Cache so that the rest of the proxycfg package -// doesn't need to reference cache.UpdateEvent (it will be extracted into a -// shared library in the future). -type CacheWrapper struct { - Cache *cache.Cache -} - -func (w *CacheWrapper) Notify( - ctx context.Context, - t string, - req cache.Request, - correlationID string, - ch chan<- UpdateEvent, -) error { - return w.Cache.NotifyCallback(ctx, t, req, correlationID, dispatchCacheUpdate(ctx, ch)) -} - -func dispatchCacheUpdate(ctx context.Context, ch chan<- UpdateEvent) cache.Callback { - return func(ctx context.Context, e cache.UpdateEvent) { - u := UpdateEvent{ - CorrelationID: e.CorrelationID, - Result: e.Result, - Err: e.Err, - } - - select { - case ch <- u: - case <-ctx.Done(): - } - } -} diff --git a/agent/proxycfg/ingress_gateway.go b/agent/proxycfg/ingress_gateway.go index 5ae2998663..3fb67ddabc 100644 --- a/agent/proxycfg/ingress_gateway.go +++ b/agent/proxycfg/ingress_gateway.go @@ -15,7 +15,7 @@ type handlerIngressGateway struct { func (s *handlerIngressGateway) initialize(ctx context.Context) (ConfigSnapshot, error) { snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig) // Watch for root changes - err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{ + err := s.dataSources.CARoots.Notify(ctx, &structs.DCSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, Source: *s.source, @@ -25,7 +25,7 @@ func (s *handlerIngressGateway) initialize(ctx context.Context) (ConfigSnapshot, } // Get information about the entire service mesh. - err = s.cache.Notify(ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{ + err = s.dataSources.ConfigEntry.Notify(ctx, &structs.ConfigEntryQuery{ Kind: structs.MeshConfig, Name: structs.MeshConfigMesh, Datacenter: s.source.Datacenter, @@ -37,7 +37,7 @@ func (s *handlerIngressGateway) initialize(ctx context.Context) (ConfigSnapshot, } // Watch this ingress gateway's config entry - err = s.cache.Notify(ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{ + err = s.dataSources.ConfigEntry.Notify(ctx, &structs.ConfigEntryQuery{ Kind: structs.IngressGateway, Name: s.service, Datacenter: s.source.Datacenter, @@ -49,7 +49,7 @@ func (s *handlerIngressGateway) initialize(ctx context.Context) (ConfigSnapshot, } // Watch the ingress-gateway's list of upstreams - err = s.cache.Notify(ctx, cachetype.GatewayServicesName, &structs.ServiceSpecificRequest{ + err = s.dataSources.GatewayServices.Notify(ctx, &structs.ServiceSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, ServiceName: s.service, @@ -195,7 +195,7 @@ func (s *handlerIngressGateway) watchIngressLeafCert(ctx context.Context, snap * snap.IngressGateway.LeafCertWatchCancel() } ctx, cancel := context.WithCancel(ctx) - err := s.cache.Notify(ctx, cachetype.ConnectCALeafName, &cachetype.ConnectCALeafRequest{ + err := s.dataSources.LeafCertificate.Notify(ctx, &cachetype.ConnectCALeafRequest{ Datacenter: s.source.Datacenter, Token: s.token, Service: s.service, diff --git a/agent/proxycfg/manager.go b/agent/proxycfg/manager.go index 5723808f5f..3de11b3f8a 100644 --- a/agent/proxycfg/manager.go +++ b/agent/proxycfg/manager.go @@ -57,11 +57,9 @@ type Manager struct { // panic. The ManagerConfig is passed by value to NewManager so the passed value // can be mutated safely. type ManagerConfig struct { - // Cache is the agent's cache instance that can be used to retrieve, store and - // monitor state for the proxies. - Cache CacheNotifier - // Health provides service health updates on a notification channel. - Health Health + // DataSources contains the dependencies used to consume data used to configure + // proxies. + DataSources DataSources // source describes the current agent's identity, it's used directly for // prepared query discovery but also indirectly as a way to pass current // Datacenter name into other request types that need it. This is sufficient @@ -81,7 +79,7 @@ type ManagerConfig struct { // NewManager constructs a Manager. func NewManager(cfg ManagerConfig) (*Manager, error) { - if cfg.Cache == nil || cfg.Source == nil || cfg.Logger == nil { + if cfg.Source == nil || cfg.Logger == nil { return nil, errors.New("all ManagerConfig fields must be provided") } m := &Manager{ @@ -135,8 +133,7 @@ func (m *Manager) Register(id ProxyID, ns *structs.NodeService, source ProxySour // TODO: move to a function that translates ManagerConfig->stateConfig stateConfig := stateConfig{ logger: m.Logger.With("service_id", id.String()), - cache: m.Cache, - health: m.Health, + dataSources: m.DataSources, source: m.Source, dnsConfig: m.DNSConfig, intentionDefaultAllow: m.IntentionDefaultAllow, diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index 2e8b305d1d..402b48bc82 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -2,20 +2,16 @@ package proxycfg import ( "context" - "path" "testing" "time" "github.com/mitchellh/copystructure" "github.com/stretchr/testify/require" - "golang.org/x/time/rate" "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/cache" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/discoverychain" - "github.com/hashicorp/consul/agent/rpcclient/health" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil" @@ -31,25 +27,22 @@ func mustCopyProxyConfig(t *testing.T, ns *structs.NodeService) structs.ConnectP // assertLastReqArgs verifies that each request type had the correct source // parameters (e.g. Datacenter name) and token. -func assertLastReqArgs(t *testing.T, types *TestCacheTypes, token string, source *structs.QuerySource) { +func assertLastReqArgs(t *testing.T, dataSources *TestDataSources, token string, source *structs.QuerySource) { t.Helper() // Roots needs correct DC and token - rootReq := types.roots.lastReq.Load() - require.IsType(t, rootReq, &structs.DCSpecificRequest{}) - require.Equal(t, token, rootReq.(*structs.DCSpecificRequest).Token) - require.Equal(t, source.Datacenter, rootReq.(*structs.DCSpecificRequest).Datacenter) + rootReq := dataSources.CARoots.LastReq() + require.Equal(t, token, rootReq.Token) + require.Equal(t, source.Datacenter, rootReq.Datacenter) // Leaf needs correct DC and token - leafReq := types.leaf.lastReq.Load() - require.IsType(t, leafReq, &cachetype.ConnectCALeafRequest{}) - require.Equal(t, token, leafReq.(*cachetype.ConnectCALeafRequest).Token) - require.Equal(t, source.Datacenter, leafReq.(*cachetype.ConnectCALeafRequest).Datacenter) + leafReq := dataSources.LeafCertificate.LastReq() + require.Equal(t, token, leafReq.Token) + require.Equal(t, source.Datacenter, leafReq.Datacenter) // Intentions needs correct DC and token - intReq := types.intentions.lastReq.Load() - require.IsType(t, intReq, &structs.IntentionQueryRequest{}) - require.Equal(t, token, intReq.(*structs.IntentionQueryRequest).Token) - require.Equal(t, source.Datacenter, intReq.(*structs.IntentionQueryRequest).Datacenter) + intReq := dataSources.Intentions.LastReq() + require.Equal(t, token, intReq.Token) + require.Equal(t, source.Datacenter, intReq.Datacenter) } func TestManager_BasicLifecycle(t *testing.T) { @@ -125,16 +118,17 @@ func TestManager_BasicLifecycle(t *testing.T) { }, } - rootsCacheKey := testGenCacheKey(&structs.DCSpecificRequest{ + rootsReq := &structs.DCSpecificRequest{ Datacenter: "dc1", QueryOptions: structs.QueryOptions{Token: "my-token"}, - }) - leafCacheKey := testGenCacheKey(&cachetype.ConnectCALeafRequest{ + } + leafReq := &cachetype.ConnectCALeafRequest{ Datacenter: "dc1", Token: "my-token", Service: "web", - }) - intentionCacheKey := testGenCacheKey(&structs.IntentionQueryRequest{ + } + + intentionReq := &structs.IntentionQueryRequest{ Datacenter: "dc1", QueryOptions: structs.QueryOptions{Token: "my-token"}, Match: &structs.IntentionQueryMatch{ @@ -147,16 +141,17 @@ func TestManager_BasicLifecycle(t *testing.T) { }, }, }, - }) - meshCacheKey := testGenCacheKey(&structs.ConfigEntryQuery{ + } + + meshConfigReq := &structs.ConfigEntryQuery{ Datacenter: "dc1", QueryOptions: structs.QueryOptions{Token: "my-token"}, Kind: structs.MeshConfig, Name: structs.MeshConfigMesh, EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), - }) + } - dbChainCacheKey := testGenCacheKey(&structs.DiscoveryChainRequest{ + dbChainReq := &structs.DiscoveryChainRequest{ Name: "db", EvaluateInDatacenter: "dc1", EvaluateInNamespace: "default", @@ -166,16 +161,16 @@ func TestManager_BasicLifecycle(t *testing.T) { OverrideConnectTimeout: 1 * time.Second, Datacenter: "dc1", QueryOptions: structs.QueryOptions{Token: "my-token"}, - }) + } - dbHealthCacheKey := testGenCacheKey(&structs.ServiceSpecificRequest{ + dbHealthReq := &structs.ServiceSpecificRequest{ Datacenter: "dc1", QueryOptions: structs.QueryOptions{Token: "my-token", Filter: ""}, ServiceName: "db", Connect: true, EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), - }) - db_v1_HealthCacheKey := testGenCacheKey(&structs.ServiceSpecificRequest{ + } + db_v1_HealthReq := &structs.ServiceSpecificRequest{ Datacenter: "dc1", QueryOptions: structs.QueryOptions{Token: "my-token", Filter: "Service.Meta.version == v1", @@ -183,8 +178,8 @@ func TestManager_BasicLifecycle(t *testing.T) { ServiceName: "db", Connect: true, EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), - }) - db_v2_HealthCacheKey := testGenCacheKey(&structs.ServiceSpecificRequest{ + } + db_v2_HealthReq := &structs.ServiceSpecificRequest{ Datacenter: "dc1", QueryOptions: structs.QueryOptions{Token: "my-token", Filter: "Service.Meta.version == v2", @@ -192,7 +187,7 @@ func TestManager_BasicLifecycle(t *testing.T) { ServiceName: "db", Connect: true, EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), - }) + } db := structs.NewServiceName("db", nil) dbUID := NewUpstreamIDFromServiceName(db) @@ -201,12 +196,12 @@ func TestManager_BasicLifecycle(t *testing.T) { tests := []*testcase_BasicLifecycle{ { name: "simple-default-resolver", - setup: func(t *testing.T, types *TestCacheTypes) { + setup: func(t *testing.T, dataSources *TestDataSources) { // Note that we deliberately leave the 'geo-cache' prepared query to time out - types.health.Set(dbHealthCacheKey, &structs.IndexedCheckServiceNodes{ + dataSources.Health.Set(dbHealthReq, &structs.IndexedCheckServiceNodes{ Nodes: TestUpstreamNodes(t, db.Name), }) - types.compiledChain.Set(dbChainCacheKey, &structs.DiscoveryChainResponse{ + dataSources.CompiledDiscoveryChain.Set(dbChainReq, &structs.DiscoveryChainResponse{ Chain: dbDefaultChain(), }) }, @@ -257,15 +252,15 @@ func TestManager_BasicLifecycle(t *testing.T) { }, { name: "chain-resolver-with-version-split", - setup: func(t *testing.T, types *TestCacheTypes) { + setup: func(t *testing.T, dataSources *TestDataSources) { // Note that we deliberately leave the 'geo-cache' prepared query to time out - types.health.Set(db_v1_HealthCacheKey, &structs.IndexedCheckServiceNodes{ + dataSources.Health.Set(db_v1_HealthReq, &structs.IndexedCheckServiceNodes{ Nodes: TestUpstreamNodes(t, db.Name), }) - types.health.Set(db_v2_HealthCacheKey, &structs.IndexedCheckServiceNodes{ + dataSources.Health.Set(db_v2_HealthReq, &structs.IndexedCheckServiceNodes{ Nodes: TestUpstreamNodesAlternate(t), }) - types.compiledChain.Set(dbChainCacheKey, &structs.DiscoveryChainResponse{ + dataSources.CompiledDiscoveryChain.Set(dbChainReq, &structs.DiscoveryChainResponse{ Chain: dbSplitChain(), }) }, @@ -322,15 +317,13 @@ func TestManager_BasicLifecycle(t *testing.T) { require.NotNil(t, tt.setup) require.NotNil(t, tt.expectSnap) - // Use a mocked cache to make life simpler - types := NewTestCacheTypes(t) - // Setup initial values - types.roots.Set(rootsCacheKey, roots) - types.leaf.Set(leafCacheKey, leaf) - types.intentions.Set(intentionCacheKey, TestIntentions()) - types.configEntry.Set(meshCacheKey, &structs.ConfigEntryResponse{Entry: nil}) - tt.setup(t, types) + dataSources := NewTestDataSources() + dataSources.LeafCertificate.Set(leafReq, leaf) + dataSources.CARoots.Set(rootsReq, roots) + dataSources.Intentions.Set(intentionReq, TestIntentions()) + dataSources.ConfigEntry.Set(meshConfigReq, &structs.ConfigEntryResponse{Entry: nil}) + tt.setup(t, dataSources) expectSnapCopy, err := copystructure.Copy(tt.expectSnap) require.NoError(t, err) @@ -338,8 +331,9 @@ func TestManager_BasicLifecycle(t *testing.T) { webProxyCopy, err := copystructure.Copy(webProxy) require.NoError(t, err) - testManager_BasicLifecycle(t, types, - rootsCacheKey, leafCacheKey, + testManager_BasicLifecycle(t, + dataSources, + rootsReq, leafReq, roots, webProxyCopy.(*structs.NodeService), expectSnapCopy.(*ConfigSnapshot), @@ -350,30 +344,28 @@ func TestManager_BasicLifecycle(t *testing.T) { type testcase_BasicLifecycle struct { name string - setup func(t *testing.T, types *TestCacheTypes) + setup func(t *testing.T, dataSources *TestDataSources) webProxy *structs.NodeService expectSnap *ConfigSnapshot } func testManager_BasicLifecycle( t *testing.T, - types *TestCacheTypes, - rootsCacheKey, leafCacheKey string, + dataSources *TestDataSources, + rootsReq *structs.DCSpecificRequest, + leafReq *cachetype.ConnectCALeafRequest, roots *structs.IndexedCARoots, webProxy *structs.NodeService, expectSnap *ConfigSnapshot, ) { - c := TestCacheWithTypes(t, types) - logger := testutil.Logger(t) source := &structs.QuerySource{Datacenter: "dc1"} // Create manager m, err := NewManager(ManagerConfig{ - Cache: &CacheWrapper{c}, - Health: &HealthWrapper{&health.Client{Cache: c, CacheName: cachetype.HealthServicesName}}, - Source: source, - Logger: logger, + Source: source, + Logger: logger, + DataSources: dataSources.ToDataSources(), }) require.NoError(t, err) @@ -396,7 +388,7 @@ func testManager_BasicLifecycle( assertWatchChanRecvs(t, wCh, expectSnap) require.True(t, time.Since(start) >= coalesceTimeout) - assertLastReqArgs(t, types, "my-token", source) + assertLastReqArgs(t, dataSources, "my-token", source) // Update NodeConfig webProxy.Port = 7777 @@ -420,11 +412,11 @@ func testManager_BasicLifecycle( // This is actually sort of timing dependent - the cache background fetcher // will still be fetching with the old token, but we rely on the fact that our // mock type will have been blocked on those for a while. - assertLastReqArgs(t, types, "other-token", source) + assertLastReqArgs(t, dataSources, "other-token", source) // Update roots newRoots, newLeaf := TestCerts(t) newRoots.Roots = append(newRoots.Roots, roots.Roots...) - types.roots.Set(rootsCacheKey, newRoots) + dataSources.CARoots.Set(rootsReq, newRoots) // Expect new roots in snapshot expectSnap.Roots = newRoots @@ -432,7 +424,7 @@ func testManager_BasicLifecycle( assertWatchChanRecvs(t, wCh2, expectSnap) // Update leaf - types.leaf.Set(leafCacheKey, newLeaf) + dataSources.LeafCertificate.Set(leafReq, newLeaf) // Expect new roots in snapshot expectSnap.ConnectProxy.Leaf = newLeaf @@ -500,7 +492,6 @@ func TestManager_deliverLatest(t *testing.T) { // None of these need to do anything to test this method just be valid logger := testutil.Logger(t) cfg := ManagerConfig{ - Cache: &CacheWrapper{cache.New(cache.Options{EntryFetchRate: rate.Inf, EntryFetchMaxBurst: 2})}, Source: &structs.QuerySource{ Node: "node1", Datacenter: "dc1", @@ -555,21 +546,14 @@ func TestManager_deliverLatest(t *testing.T) { require.Equal(t, snap2, <-ch5) } -func testGenCacheKey(req cache.Request) string { - info := req.CacheInfo() - return path.Join(info.Key, info.Datacenter) -} - func TestManager_SyncState_No_Notify(t *testing.T) { - types := NewTestCacheTypes(t) - c := TestCacheWithTypes(t, types) + dataSources := NewTestDataSources() logger := testutil.Logger(t) m, err := NewManager(ManagerConfig{ - Cache: &CacheWrapper{c}, - Health: &HealthWrapper{&health.Client{Cache: c, CacheName: cachetype.HealthServicesName}}, - Source: &structs.QuerySource{Datacenter: "dc1"}, - Logger: logger, + Source: &structs.QuerySource{Datacenter: "dc1"}, + Logger: logger, + DataSources: dataSources.ToDataSources(), }) require.NoError(t, err) defer m.Close() diff --git a/agent/proxycfg/mesh_gateway.go b/agent/proxycfg/mesh_gateway.go index fb212a2f40..45b80b5524 100644 --- a/agent/proxycfg/mesh_gateway.go +++ b/agent/proxycfg/mesh_gateway.go @@ -6,7 +6,6 @@ import ( "strings" "time" - cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/logging" ) @@ -19,7 +18,7 @@ type handlerMeshGateway struct { func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, error) { snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig) // Watch for root changes - err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{ + err := s.dataSources.CARoots.Notify(ctx, &structs.DCSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, Source: *s.source, @@ -34,7 +33,7 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er // Eventually we will have to watch connect enabled instances for each service as well as the // destination services themselves but those notifications will be setup later. // We cannot setup those watches until we know what the services are. - err = s.cache.Notify(ctx, cachetype.CatalogServiceListName, &structs.DCSpecificRequest{ + err = s.dataSources.ServiceList.Notify(ctx, &structs.DCSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, Source: *s.source, @@ -46,7 +45,7 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er } // Watch service-resolvers so we can setup service subset clusters - err = s.cache.Notify(ctx, cachetype.ConfigEntryListName, &structs.ConfigEntryQuery{ + err = s.dataSources.ConfigEntryList.Notify(ctx, &structs.ConfigEntryQuery{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, Kind: structs.ServiceResolver, @@ -85,7 +84,7 @@ func (s *handlerMeshGateway) initializeCrossDCWatches(ctx context.Context) error // Conveniently we can just use this service meta attribute in one // place here to set the machinery in motion and leave the conditional // behavior out of the rest of the package. - err := s.cache.Notify(ctx, cachetype.FederationStateListMeshGatewaysName, &structs.DCSpecificRequest{ + err := s.dataSources.FederationStateListMeshGateways.Notify(ctx, &structs.DCSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, Source: *s.source, @@ -94,7 +93,7 @@ func (s *handlerMeshGateway) initializeCrossDCWatches(ctx context.Context) error return err } - err = s.health.Notify(ctx, structs.ServiceSpecificRequest{ + err = s.dataSources.Health.Notify(ctx, &structs.ServiceSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, ServiceName: structs.ConsulServiceName, @@ -104,7 +103,7 @@ func (s *handlerMeshGateway) initializeCrossDCWatches(ctx context.Context) error } } - err := s.cache.Notify(ctx, cachetype.CatalogDatacentersName, &structs.DatacentersRequest{ + err := s.dataSources.Datacenters.Notify(ctx, &structs.DatacentersRequest{ QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: 30 * time.Second}, }, datacentersWatchID, s.ch) if err != nil { @@ -168,7 +167,7 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn if _, ok := snap.MeshGateway.WatchedServices[svc]; !ok { ctx, cancel := context.WithCancel(ctx) - err := s.health.Notify(ctx, structs.ServiceSpecificRequest{ + err := s.dataSources.Health.Notify(ctx, &structs.ServiceSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, ServiceName: svc.Name, @@ -220,7 +219,7 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn if _, ok := snap.MeshGateway.WatchedGateways[gk.String()]; !ok { ctx, cancel := context.WithCancel(ctx) - err := s.cache.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{ + err := s.dataSources.InternalServiceDump.Notify(ctx, &structs.ServiceDumpRequest{ Datacenter: dc, QueryOptions: structs.QueryOptions{Token: s.token}, ServiceKind: structs.ServiceKindMeshGateway, diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 5a844fea90..4ac17ebd7f 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -11,29 +11,11 @@ import ( "github.com/hashicorp/go-hclog" "github.com/mitchellh/copystructure" - "github.com/hashicorp/consul/agent/cache" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/logging" ) -// UpdateEvent contains new data for a resource we are subscribed to (e.g. an -// agent cache entry). -type UpdateEvent struct { - CorrelationID string - Result interface{} - Err error -} - -type CacheNotifier interface { - Notify(ctx context.Context, t string, r cache.Request, - correlationID string, ch chan<- UpdateEvent) error -} - -type Health interface { - Notify(ctx context.Context, req structs.ServiceSpecificRequest, correlationID string, ch chan<- UpdateEvent) error -} - const ( coalesceTimeout = 200 * time.Millisecond rootsWatchID = "roots" @@ -61,8 +43,7 @@ const ( type stateConfig struct { logger hclog.Logger source *structs.QuerySource - cache CacheNotifier - health Health + dataSources DataSources dnsConfig DNSConfig serverSNIFn ServerSNIFunc intentionDefaultAllow bool @@ -458,16 +439,16 @@ func hostnameEndpoints(logger hclog.Logger, localKey GatewayKey, nodes structs.C } type gatewayWatchOpts struct { - notifier CacheNotifier - notifyCh chan UpdateEvent - source structs.QuerySource - token string - key GatewayKey - upstreamID UpstreamID + internalServiceDump InternalServiceDump + notifyCh chan UpdateEvent + source structs.QuerySource + token string + key GatewayKey + upstreamID UpstreamID } func watchMeshGateway(ctx context.Context, opts gatewayWatchOpts) error { - return opts.notifier.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{ + return opts.internalServiceDump.Notify(ctx, &structs.ServiceDumpRequest{ Datacenter: opts.key.Datacenter, QueryOptions: structs.QueryOptions{Token: opts.token}, ServiceKind: structs.ServiceKindMeshGateway, diff --git a/agent/proxycfg/state_oss_test.go b/agent/proxycfg/state_oss_test.go new file mode 100644 index 0000000000..0034041f74 --- /dev/null +++ b/agent/proxycfg/state_oss_test.go @@ -0,0 +1,6 @@ +//go:build !consulent +// +build !consulent + +package proxycfg + +func recordWatchesEnterprise(*stateConfig, *watchRecorder) {} diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index a165b263ed..a2fa5914fc 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -11,10 +11,8 @@ import ( "github.com/stretchr/testify/require" "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/cache" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/consul/discoverychain" - "github.com/hashicorp/consul/agent/rpcclient/health" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/sdk/testutil" ) @@ -114,100 +112,89 @@ func TestStateChanged(t *testing.T) { } } -type testCacheNotifierRequest struct { - cacheType string - request cache.Request - cb func(UpdateEvent) +func recordWatches(sc *stateConfig) *watchRecorder { + wr := newWatchRecorder() + + sc.dataSources = DataSources{ + CARoots: typedWatchRecorder[*structs.DCSpecificRequest]{wr}, + CompiledDiscoveryChain: typedWatchRecorder[*structs.DiscoveryChainRequest]{wr}, + ConfigEntry: typedWatchRecorder[*structs.ConfigEntryQuery]{wr}, + ConfigEntryList: typedWatchRecorder[*structs.ConfigEntryQuery]{wr}, + Datacenters: typedWatchRecorder[*structs.DatacentersRequest]{wr}, + FederationStateListMeshGateways: typedWatchRecorder[*structs.DCSpecificRequest]{wr}, + GatewayServices: typedWatchRecorder[*structs.ServiceSpecificRequest]{wr}, + Health: typedWatchRecorder[*structs.ServiceSpecificRequest]{wr}, + HTTPChecks: typedWatchRecorder[*cachetype.ServiceHTTPChecksRequest]{wr}, + Intentions: typedWatchRecorder[*structs.IntentionQueryRequest]{wr}, + IntentionUpstreams: typedWatchRecorder[*structs.ServiceSpecificRequest]{wr}, + InternalServiceDump: typedWatchRecorder[*structs.ServiceDumpRequest]{wr}, + LeafCertificate: typedWatchRecorder[*cachetype.ConnectCALeafRequest]{wr}, + PreparedQuery: typedWatchRecorder[*structs.PreparedQueryExecuteRequest]{wr}, + ResolvedServiceConfig: typedWatchRecorder[*structs.ServiceConfigRequest]{wr}, + ServiceList: typedWatchRecorder[*structs.DCSpecificRequest]{wr}, + } + recordWatchesEnterprise(sc, wr) + + return wr } -type testCacheNotifier struct { - lock sync.RWMutex - notifiers map[string]testCacheNotifierRequest -} - -func newTestCacheNotifier() *testCacheNotifier { - return &testCacheNotifier{ - notifiers: make(map[string]testCacheNotifierRequest), +func newWatchRecorder() *watchRecorder { + return &watchRecorder{ + watches: make(map[string]any), } } -func (cn *testCacheNotifier) Notify(ctx context.Context, t string, r cache.Request, correlationId string, ch chan<- UpdateEvent) error { - cn.lock.Lock() - cn.notifiers[correlationId] = testCacheNotifierRequest{t, r, func(event UpdateEvent) { ch <- event }} - cn.lock.Unlock() +type watchRecorder struct { + mu sync.Mutex + watches map[string]any +} + +func (r *watchRecorder) record(correlationID string, req any) { + r.mu.Lock() + r.watches[correlationID] = req + r.mu.Unlock() +} + +func (r *watchRecorder) verify(t *testing.T, correlationID string, verifyFn verifyWatchRequest) { + t.Helper() + + r.mu.Lock() + req, ok := r.watches[correlationID] + r.mu.Unlock() + + require.True(t, ok, "No such watch for Correlation ID: %q", correlationID) + + if verifyFn != nil { + verifyFn(t, req) + } +} + +type typedWatchRecorder[ReqType any] struct { + recorder *watchRecorder +} + +func (r typedWatchRecorder[ReqType]) Notify(_ context.Context, req ReqType, correlationID string, _ chan<- UpdateEvent) error { + r.recorder.record(correlationID, req) return nil } -// NotifyCallback satisfies the health.CacheGetter interface. -func (cn *testCacheNotifier) NotifyCallback(ctx context.Context, t string, r cache.Request, correlationId string, cb cache.Callback) error { - cn.lock.Lock() - cn.notifiers[correlationId] = testCacheNotifierRequest{t, r, func(event UpdateEvent) { - cb(ctx, cache.UpdateEvent{ - CorrelationID: event.CorrelationID, - Result: event.Result, - Err: event.Err, - }) - }} - cn.lock.Unlock() - return nil -} - -func (cn *testCacheNotifier) Get(ctx context.Context, t string, r cache.Request) (interface{}, cache.ResultMeta, error) { - panic("Get: not implemented") -} - -func (cn *testCacheNotifier) getNotifierRequest(t testing.TB, correlationId string) testCacheNotifierRequest { - cn.lock.RLock() - req, ok := cn.notifiers[correlationId] - cn.lock.RUnlock() - require.True(t, ok, "Correlation ID: %s is missing", correlationId) - return req -} - -func (cn *testCacheNotifier) sendNotification(t testing.TB, correlationId string, event UpdateEvent) { - req := cn.getNotifierRequest(t, correlationId) - require.NotNil(t, req.cb) - req.cb(event) -} - -func (cn *testCacheNotifier) verifyWatch(t testing.TB, correlationId string) (string, cache.Request) { - // t.Logf("Watches: %+v", cn.notifiers) - req := cn.getNotifierRequest(t, correlationId) - require.NotNil(t, req.cb) - return req.cacheType, req.request -} - -type verifyWatchRequest func(t testing.TB, cacheType string, request cache.Request) - -func genVerifyDCSpecificWatch(expectedCacheType string, expectedDatacenter string) verifyWatchRequest { - return func(t testing.TB, cacheType string, request cache.Request) { - require.Equal(t, expectedCacheType, cacheType) +type verifyWatchRequest func(t testing.TB, request any) +func genVerifyDCSpecificWatch(expectedDatacenter string) verifyWatchRequest { + return func(t testing.TB, request any) { reqReal, ok := request.(*structs.DCSpecificRequest) require.True(t, ok) require.Equal(t, expectedDatacenter, reqReal.Datacenter) } } -func genVerifyRootsWatch(expectedDatacenter string) verifyWatchRequest { - return genVerifyDCSpecificWatch(cachetype.ConnectCARootName, expectedDatacenter) -} - -func genVerifyListServicesWatch(expectedDatacenter string) verifyWatchRequest { - return genVerifyDCSpecificWatch(cachetype.CatalogServiceListName, expectedDatacenter) -} - -func verifyDatacentersWatch(t testing.TB, cacheType string, request cache.Request) { - require.Equal(t, cachetype.CatalogDatacentersName, cacheType) - +func verifyDatacentersWatch(t testing.TB, request any) { _, ok := request.(*structs.DatacentersRequest) require.True(t, ok) } func genVerifyLeafWatchWithDNSSANs(expectedService string, expectedDatacenter string, expectedDNSSANs []string) verifyWatchRequest { - return func(t testing.TB, cacheType string, request cache.Request) { - require.Equal(t, cachetype.ConnectCALeafName, cacheType) - + return func(t testing.TB, request any) { reqReal, ok := request.(*cachetype.ConnectCALeafRequest) require.True(t, ok) require.Equal(t, expectedDatacenter, reqReal.Datacenter) @@ -221,9 +208,7 @@ func genVerifyLeafWatch(expectedService string, expectedDatacenter string) verif } func genVerifyResolverWatch(expectedService, expectedDatacenter, expectedKind string) verifyWatchRequest { - return func(t testing.TB, cacheType string, request cache.Request) { - require.Equal(t, cachetype.ConfigEntryName, cacheType) - + return func(t testing.TB, request any) { reqReal, ok := request.(*structs.ConfigEntryQuery) require.True(t, ok) require.Equal(t, expectedDatacenter, reqReal.Datacenter) @@ -233,9 +218,7 @@ func genVerifyResolverWatch(expectedService, expectedDatacenter, expectedKind st } func genVerifyResolvedConfigWatch(expectedService string, expectedDatacenter string) verifyWatchRequest { - return func(t testing.TB, cacheType string, request cache.Request) { - require.Equal(t, cachetype.ResolvedServiceConfigName, cacheType) - + return func(t testing.TB, request any) { reqReal, ok := request.(*structs.ServiceConfigRequest) require.True(t, ok) require.Equal(t, expectedDatacenter, reqReal.Datacenter) @@ -244,9 +227,7 @@ func genVerifyResolvedConfigWatch(expectedService string, expectedDatacenter str } func genVerifyIntentionWatch(expectedService string, expectedDatacenter string) verifyWatchRequest { - return func(t testing.TB, cacheType string, request cache.Request) { - require.Equal(t, cachetype.IntentionMatchName, cacheType) - + return func(t testing.TB, request any) { reqReal, ok := request.(*structs.IntentionQueryRequest) require.True(t, ok) require.Equal(t, expectedDatacenter, reqReal.Datacenter) @@ -258,21 +239,8 @@ func genVerifyIntentionWatch(expectedService string, expectedDatacenter string) } } -func genVerifyIntentionUpstreamsWatch(expectedService string, expectedDatacenter string) verifyWatchRequest { - return func(t testing.TB, cacheType string, request cache.Request) { - require.Equal(t, cachetype.IntentionUpstreamsName, cacheType) - - reqReal, ok := request.(*structs.ServiceSpecificRequest) - require.True(t, ok) - require.Equal(t, expectedDatacenter, reqReal.Datacenter) - require.Equal(t, expectedService, reqReal.ServiceName) - } -} - func genVerifyPreparedQueryWatch(expectedName string, expectedDatacenter string) verifyWatchRequest { - return func(t testing.TB, cacheType string, request cache.Request) { - require.Equal(t, cachetype.PreparedQueryName, cacheType) - + return func(t testing.TB, request any) { reqReal, ok := request.(*structs.PreparedQueryExecuteRequest) require.True(t, ok) require.Equal(t, expectedDatacenter, reqReal.Datacenter) @@ -282,9 +250,7 @@ func genVerifyPreparedQueryWatch(expectedName string, expectedDatacenter string) } func genVerifyDiscoveryChainWatch(expected *structs.DiscoveryChainRequest) verifyWatchRequest { - return func(t testing.TB, cacheType string, request cache.Request) { - require.Equal(t, cachetype.CompiledDiscoveryChainName, cacheType) - + return func(t testing.TB, request any) { reqReal, ok := request.(*structs.DiscoveryChainRequest) require.True(t, ok) require.Equal(t, expected, reqReal) @@ -292,9 +258,7 @@ func genVerifyDiscoveryChainWatch(expected *structs.DiscoveryChainRequest) verif } func genVerifyMeshConfigWatch(expectedDatacenter string) verifyWatchRequest { - return func(t testing.TB, cacheType string, request cache.Request) { - require.Equal(t, cachetype.ConfigEntryName, cacheType) - + return func(t testing.TB, request any) { reqReal, ok := request.(*structs.ConfigEntryQuery) require.True(t, ok) require.Equal(t, expectedDatacenter, reqReal.Datacenter) @@ -304,9 +268,7 @@ func genVerifyMeshConfigWatch(expectedDatacenter string) verifyWatchRequest { } func genVerifyGatewayWatch(expectedDatacenter string) verifyWatchRequest { - return func(t testing.TB, cacheType string, request cache.Request) { - require.Equal(t, cachetype.InternalServiceDumpName, cacheType) - + return func(t testing.TB, request any) { reqReal, ok := request.(*structs.ServiceDumpRequest) require.True(t, ok) require.Equal(t, expectedDatacenter, reqReal.Datacenter) @@ -316,10 +278,8 @@ func genVerifyGatewayWatch(expectedDatacenter string) verifyWatchRequest { } } -func genVerifyServiceSpecificRequest(expectedCacheType, expectedService, expectedFilter, expectedDatacenter string, connect bool) verifyWatchRequest { - return func(t testing.TB, cacheType string, request cache.Request) { - require.Equal(t, expectedCacheType, cacheType) - +func genVerifyServiceSpecificRequest(expectedService, expectedFilter, expectedDatacenter string, connect bool) verifyWatchRequest { + return func(t testing.TB, request any) { reqReal, ok := request.(*structs.ServiceSpecificRequest) require.True(t, ok) require.Equal(t, expectedDatacenter, reqReal.Datacenter) @@ -329,18 +289,12 @@ func genVerifyServiceSpecificRequest(expectedCacheType, expectedService, expecte } } -func genVerifyServiceWatch(expectedService, expectedFilter, expectedDatacenter string, connect bool) verifyWatchRequest { - return genVerifyServiceSpecificRequest(cachetype.HealthServicesName, expectedService, expectedFilter, expectedDatacenter, connect) -} - func genVerifyGatewayServiceWatch(expectedService, expectedDatacenter string) verifyWatchRequest { - return genVerifyServiceSpecificRequest(cachetype.GatewayServicesName, expectedService, "", expectedDatacenter, false) + return genVerifyServiceSpecificRequest(expectedService, "", expectedDatacenter, false) } func genVerifyConfigEntryWatch(expectedKind, expectedName, expectedDatacenter string) verifyWatchRequest { - return func(t testing.TB, cacheType string, request cache.Request) { - require.Equal(t, cachetype.ConfigEntryName, cacheType) - + return func(t testing.TB, request any) { reqReal, ok := request.(*structs.ConfigEntryQuery) require.True(t, ok) require.Equal(t, expectedKind, reqReal.Kind) @@ -507,11 +461,8 @@ func TestState_WatchesAndUpdates(t *testing.T) { stage0 := verificationStage{ requiredWatches: map[string]verifyWatchRequest{ - rootsWatchID: genVerifyRootsWatch("dc1"), - leafWatchID: genVerifyLeafWatch("web", "dc1"), - intentionsWatchID: genVerifyIntentionWatch("web", "dc1"), - meshConfigEntryID: genVerifyMeshConfigWatch("dc1"), - "upstream:" + pqUID.String(): genVerifyPreparedQueryWatch("query", "dc1"), + intentionsWatchID: genVerifyIntentionWatch("web", "dc1"), + meshConfigEntryID: genVerifyMeshConfigWatch("dc1"), fmt.Sprintf("discovery-chain:%s", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "api", EvaluateInDatacenter: "dc1", @@ -562,6 +513,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { Mode: meshGatewayProxyConfigValue, }, }), + "upstream:" + pqUID.String(): genVerifyPreparedQueryWatch("query", "dc1"), + rootsWatchID: genVerifyDCSpecificWatch("dc1"), + leafWatchID: genVerifyLeafWatch("web", "dc1"), }, events: []UpdateEvent{ rootWatchEvent(), @@ -660,10 +614,10 @@ func TestState_WatchesAndUpdates(t *testing.T) { stage1 := verificationStage{ requiredWatches: map[string]verifyWatchRequest{ - fmt.Sprintf("upstream-target:api.default.default.dc1:%s", apiUID.String()): genVerifyServiceWatch("api", "", "dc1", true), - fmt.Sprintf("upstream-target:api-failover-remote.default.default.dc2:%s-failover-remote?dc=dc2", apiUID.String()): genVerifyServiceWatch("api-failover-remote", "", "dc2", true), - fmt.Sprintf("upstream-target:api-failover-local.default.default.dc2:%s-failover-local?dc=dc2", apiUID.String()): genVerifyServiceWatch("api-failover-local", "", "dc2", true), - fmt.Sprintf("upstream-target:api-failover-direct.default.default.dc2:%s-failover-direct?dc=dc2", apiUID.String()): genVerifyServiceWatch("api-failover-direct", "", "dc2", true), + fmt.Sprintf("upstream-target:api.default.default.dc1:%s", apiUID.String()): genVerifyServiceSpecificRequest("api", "", "dc1", true), + fmt.Sprintf("upstream-target:api-failover-remote.default.default.dc2:%s-failover-remote?dc=dc2", apiUID.String()): genVerifyServiceSpecificRequest("api-failover-remote", "", "dc2", true), + fmt.Sprintf("upstream-target:api-failover-local.default.default.dc2:%s-failover-local?dc=dc2", apiUID.String()): genVerifyServiceSpecificRequest("api-failover-local", "", "dc2", true), + fmt.Sprintf("upstream-target:api-failover-direct.default.default.dc2:%s-failover-direct?dc=dc2", apiUID.String()): genVerifyServiceSpecificRequest("api-failover-direct", "", "dc2", true), fmt.Sprintf("mesh-gateway:dc2:%s-failover-remote?dc=dc2", apiUID.String()): genVerifyGatewayWatch("dc2"), fmt.Sprintf("mesh-gateway:dc1:%s-failover-local?dc=dc2", apiUID.String()): genVerifyGatewayWatch("dc1"), }, @@ -743,9 +697,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { stages: []verificationStage{ { requiredWatches: map[string]verifyWatchRequest{ - rootsWatchID: genVerifyRootsWatch("dc1"), - serviceListWatchID: genVerifyListServicesWatch("dc1"), datacentersWatchID: verifyDatacentersWatch, + serviceListWatchID: genVerifyDCSpecificWatch("dc1"), + rootsWatchID: genVerifyDCSpecificWatch("dc1"), }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.False(t, snap.Valid(), "gateway without root is not valid") @@ -804,9 +758,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { stages: []verificationStage{ { requiredWatches: map[string]verifyWatchRequest{ - rootsWatchID: genVerifyRootsWatch("dc1"), - serviceListWatchID: genVerifyListServicesWatch("dc1"), datacentersWatchID: verifyDatacentersWatch, + serviceListWatchID: genVerifyDCSpecificWatch("dc1"), + rootsWatchID: genVerifyDCSpecificWatch("dc1"), }, events: []UpdateEvent{ rootWatchEvent(), @@ -948,9 +902,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { stages: []verificationStage{ { requiredWatches: map[string]verifyWatchRequest{ - rootsWatchID: genVerifyRootsWatch("dc1"), meshConfigEntryID: genVerifyMeshConfigWatch("dc1"), gatewayConfigWatchID: genVerifyConfigEntryWatch(structs.IngressGateway, "ingress-gateway", "dc1"), + rootsWatchID: genVerifyDCSpecificWatch("dc1"), gatewayServicesWatchID: genVerifyGatewayServiceWatch("ingress-gateway", "dc1"), }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { @@ -1061,7 +1015,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, { requiredWatches: map[string]verifyWatchRequest{ - "upstream-target:api.default.default.dc1:" + apiUID.String(): genVerifyServiceWatch("api", "", "dc1", true), + "upstream-target:api.default.default.dc1:" + apiUID.String(): genVerifyServiceSpecificRequest("api", "", "dc1", true), }, events: []UpdateEvent{ { @@ -1117,9 +1071,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { stages: []verificationStage{ { requiredWatches: map[string]verifyWatchRequest{ - rootsWatchID: genVerifyRootsWatch("dc1"), meshConfigEntryID: genVerifyMeshConfigWatch("dc1"), gatewayConfigWatchID: genVerifyConfigEntryWatch(structs.IngressGateway, "ingress-gateway", "dc1"), + rootsWatchID: genVerifyDCSpecificWatch("dc1"), gatewayServicesWatchID: genVerifyGatewayServiceWatch("ingress-gateway", "dc1"), }, events: []UpdateEvent{ @@ -1197,9 +1151,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { stages: []verificationStage{ { requiredWatches: map[string]verifyWatchRequest{ - rootsWatchID: genVerifyRootsWatch("dc1"), meshConfigEntryID: genVerifyMeshConfigWatch("dc1"), gatewayConfigWatchID: genVerifyConfigEntryWatch(structs.IngressGateway, "ingress-gateway", "dc1"), + rootsWatchID: genVerifyDCSpecificWatch("dc1"), gatewayServicesWatchID: genVerifyGatewayServiceWatch("ingress-gateway", "dc1"), }, events: []UpdateEvent{ @@ -1290,10 +1244,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { stages: []verificationStage{ { requiredWatches: map[string]verifyWatchRequest{ - rootsWatchID: genVerifyRootsWatch("dc1"), - meshConfigEntryID: genVerifyMeshConfigWatch("dc1"), - gatewayServicesWatchID: genVerifyServiceSpecificRequest(gatewayServicesWatchID, - "terminating-gateway", "", "dc1", false), + meshConfigEntryID: genVerifyMeshConfigWatch("dc1"), + rootsWatchID: genVerifyDCSpecificWatch("dc1"), + gatewayServicesWatchID: genVerifyGatewayServiceWatch("terminating-gateway", "dc1"), }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.False(t, snap.Valid(), "gateway without root is not valid") @@ -1333,10 +1286,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { stages: []verificationStage{ { requiredWatches: map[string]verifyWatchRequest{ - rootsWatchID: genVerifyRootsWatch("dc1"), - meshConfigEntryID: genVerifyMeshConfigWatch("dc1"), - gatewayServicesWatchID: genVerifyServiceSpecificRequest(gatewayServicesWatchID, - "terminating-gateway", "", "dc1", false), + meshConfigEntryID: genVerifyMeshConfigWatch("dc1"), + rootsWatchID: genVerifyDCSpecificWatch("dc1"), + gatewayServicesWatchID: genVerifyGatewayServiceWatch("terminating-gateway", "dc1"), }, events: []UpdateEvent{ rootWatchEvent(), @@ -1425,7 +1377,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, { requiredWatches: map[string]verifyWatchRequest{ - "external-service:" + db.String(): genVerifyServiceWatch("db", "", "dc1", false), + "external-service:" + db.String(): genVerifyServiceSpecificRequest("db", "", "dc1", false), }, events: []UpdateEvent{ { @@ -1470,7 +1422,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, { requiredWatches: map[string]verifyWatchRequest{ - "external-service:" + api.String(): genVerifyServiceWatch("api", "", "dc1", false), + "external-service:" + api.String(): genVerifyServiceSpecificRequest("api", "", "dc1", false), }, events: []UpdateEvent{ { @@ -1706,12 +1658,11 @@ func TestState_WatchesAndUpdates(t *testing.T) { stages: []verificationStage{ { requiredWatches: map[string]verifyWatchRequest{ - rootsWatchID: genVerifyRootsWatch("dc1"), - intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID, - "api", "", "dc1", false), - leafWatchID: genVerifyLeafWatch("api", "dc1"), - intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), - meshConfigEntryID: genVerifyMeshConfigWatch("dc1"), + intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), + intentionUpstreamsID: genVerifyServiceSpecificRequest("api", "", "dc1", false), + meshConfigEntryID: genVerifyMeshConfigWatch("dc1"), + rootsWatchID: genVerifyDCSpecificWatch("dc1"), + leafWatchID: genVerifyLeafWatch("api", "dc1"), }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid") @@ -1792,12 +1743,11 @@ func TestState_WatchesAndUpdates(t *testing.T) { // Empty on initialization { requiredWatches: map[string]verifyWatchRequest{ - rootsWatchID: genVerifyRootsWatch("dc1"), - intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID, - "api", "", "dc1", false), - leafWatchID: genVerifyLeafWatch("api", "dc1"), - intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), - meshConfigEntryID: genVerifyMeshConfigWatch("dc1"), + intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), + intentionUpstreamsID: genVerifyServiceSpecificRequest("api", "", "dc1", false), + meshConfigEntryID: genVerifyMeshConfigWatch("dc1"), + rootsWatchID: genVerifyDCSpecificWatch("dc1"), + leafWatchID: genVerifyLeafWatch("api", "dc1"), }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid") @@ -1852,11 +1802,10 @@ func TestState_WatchesAndUpdates(t *testing.T) { // Receiving an intention should lead to spinning up a discovery chain watch { requiredWatches: map[string]verifyWatchRequest{ - rootsWatchID: genVerifyRootsWatch("dc1"), - intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID, - "api", "", "dc1", false), - leafWatchID: genVerifyLeafWatch("api", "dc1"), - intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), + intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), + intentionUpstreamsID: genVerifyServiceSpecificRequest("api", "", "dc1", false), + rootsWatchID: genVerifyDCSpecificWatch("dc1"), + leafWatchID: genVerifyLeafWatch("api", "dc1"), }, events: []UpdateEvent{ { @@ -1917,7 +1866,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, { requiredWatches: map[string]verifyWatchRequest{ - "upstream-target:db.default.default.dc1:" + dbUID.String(): genVerifyServiceWatch("db", "", "dc1", true), + "upstream-target:db.default.default.dc1:" + dbUID.String(): genVerifyServiceSpecificRequest("db", "", "dc1", true), }, events: []UpdateEvent{ { @@ -2284,11 +2233,10 @@ func TestState_WatchesAndUpdates(t *testing.T) { { // Empty list of upstreams should clean up map keys requiredWatches: map[string]verifyWatchRequest{ - rootsWatchID: genVerifyRootsWatch("dc1"), - intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID, - "api", "", "dc1", false), - leafWatchID: genVerifyLeafWatch("api", "dc1"), - intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), + intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), + intentionUpstreamsID: genVerifyServiceSpecificRequest("api", "", "dc1", false), + rootsWatchID: genVerifyDCSpecificWatch("dc1"), + leafWatchID: genVerifyLeafWatch("api", "dc1"), }, events: []UpdateEvent{ { @@ -2351,12 +2299,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { // Empty on initialization { requiredWatches: map[string]verifyWatchRequest{ - rootsWatchID: genVerifyRootsWatch("dc1"), - intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID, - "api", "", "dc1", false), - leafWatchID: genVerifyLeafWatch("api", "dc1"), - intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), - meshConfigEntryID: genVerifyMeshConfigWatch("dc1"), + intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), + intentionUpstreamsID: genVerifyServiceSpecificRequest("api", "", "dc1", false), + meshConfigEntryID: genVerifyMeshConfigWatch("dc1"), "discovery-chain:" + upstreamIDForDC2(dbUID).String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "db", EvaluateInDatacenter: "dc2", @@ -2365,6 +2310,8 @@ func TestState_WatchesAndUpdates(t *testing.T) { Datacenter: "dc1", OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal}, }), + rootsWatchID: genVerifyDCSpecificWatch("dc1"), + leafWatchID: genVerifyLeafWatch("api", "dc1"), }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid") @@ -2452,11 +2399,8 @@ func TestState_WatchesAndUpdates(t *testing.T) { // be deleted from the snapshot. { requiredWatches: map[string]verifyWatchRequest{ - rootsWatchID: genVerifyRootsWatch("dc1"), - intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID, - "api", "", "dc1", false), - leafWatchID: genVerifyLeafWatch("api", "dc1"), - intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), + intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), + intentionUpstreamsID: genVerifyServiceSpecificRequest("api", "", "dc1", false), "discovery-chain:" + upstreamIDForDC2(dbUID).String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "db", EvaluateInDatacenter: "dc2", @@ -2465,6 +2409,8 @@ func TestState_WatchesAndUpdates(t *testing.T) { Datacenter: "dc1", OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal}, }), + rootsWatchID: genVerifyDCSpecificWatch("dc1"), + leafWatchID: genVerifyLeafWatch("api", "dc1"), }, events: []UpdateEvent{ { @@ -2526,8 +2472,6 @@ func TestState_WatchesAndUpdates(t *testing.T) { // First evaluate peered upstream { requiredWatches: map[string]verifyWatchRequest{ - rootsWatchID: genVerifyRootsWatch("dc1"), - leafWatchID: genVerifyLeafWatch("web", "dc1"), fmt.Sprintf("discovery-chain:%s", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "api", EvaluateInDatacenter: "dc1", @@ -2535,6 +2479,8 @@ func TestState_WatchesAndUpdates(t *testing.T) { EvaluateInPartition: "default", Datacenter: "dc1", }), + rootsWatchID: genVerifyDCSpecificWatch("dc1"), + leafWatchID: genVerifyLeafWatch("web", "dc1"), // No Peering watch }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { @@ -2604,12 +2550,10 @@ func TestState_WatchesAndUpdates(t *testing.T) { for name, tc := range cases { t.Run(name, func(t *testing.T) { - cn := newTestCacheNotifier() proxyID := ProxyID{ServiceID: tc.ns.CompoundServiceID()} - state, err := newState(proxyID, &tc.ns, testSource, "", stateConfig{ + + sc := stateConfig{ logger: testutil.Logger(t), - cache: cn, - health: &HealthWrapper{&health.Client{Cache: cn, CacheName: cachetype.HealthServicesName}}, source: &structs.QuerySource{ Datacenter: tc.sourceDC, }, @@ -2617,7 +2561,10 @@ func TestState_WatchesAndUpdates(t *testing.T) { Domain: "consul.", AltDomain: "alt.consul.", }, - }) + } + wr := recordWatches(&sc) + + state, err := newState(proxyID, &tc.ns, testSource, "", sc) // verify building the initial state worked require.NoError(t, err) @@ -2644,13 +2591,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.True(t, t.Run(fmt.Sprintf("stage-%d", idx), func(t *testing.T) { for correlationId, verifier := range stage.requiredWatches { require.True(t, t.Run(correlationId, func(t *testing.T) { - // verify that the watch was initiated - cacheType, request := cn.verifyWatch(t, correlationId) - - // run the verifier if any - if verifier != nil { - verifier(t, cacheType, request) - } + wr.verify(t, correlationId, verifier) })) } diff --git a/agent/proxycfg/terminating_gateway.go b/agent/proxycfg/terminating_gateway.go index b27dbe8c07..a9dbdb3916 100644 --- a/agent/proxycfg/terminating_gateway.go +++ b/agent/proxycfg/terminating_gateway.go @@ -17,7 +17,7 @@ type handlerTerminatingGateway struct { func (s *handlerTerminatingGateway) initialize(ctx context.Context) (ConfigSnapshot, error) { snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig) // Watch for root changes - err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{ + err := s.dataSources.CARoots.Notify(ctx, &structs.DCSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, Source: *s.source, @@ -28,7 +28,7 @@ func (s *handlerTerminatingGateway) initialize(ctx context.Context) (ConfigSnaps } // Get information about the entire service mesh. - err = s.cache.Notify(ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{ + err = s.dataSources.ConfigEntry.Notify(ctx, &structs.ConfigEntryQuery{ Kind: structs.MeshConfig, Name: structs.MeshConfigMesh, Datacenter: s.source.Datacenter, @@ -40,7 +40,7 @@ func (s *handlerTerminatingGateway) initialize(ctx context.Context) (ConfigSnaps } // Watch for the terminating-gateway's linked services - err = s.cache.Notify(ctx, cachetype.GatewayServicesName, &structs.ServiceSpecificRequest{ + err = s.dataSources.GatewayServices.Notify(ctx, &structs.ServiceSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, ServiceName: s.service, @@ -116,7 +116,7 @@ func (s *handlerTerminatingGateway) handleUpdate(ctx context.Context, u UpdateEv // Watch the health endpoint to discover endpoints for the service if _, ok := snap.TerminatingGateway.WatchedServices[svc.Service]; !ok { ctx, cancel := context.WithCancel(ctx) - err := s.health.Notify(ctx, structs.ServiceSpecificRequest{ + err := s.dataSources.Health.Notify(ctx, &structs.ServiceSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, ServiceName: svc.Service.Name, @@ -141,7 +141,7 @@ func (s *handlerTerminatingGateway) handleUpdate(ctx context.Context, u UpdateEv // The gateway will enforce intentions for connections to the service if _, ok := snap.TerminatingGateway.WatchedIntentions[svc.Service]; !ok { ctx, cancel := context.WithCancel(ctx) - err := s.cache.Notify(ctx, cachetype.IntentionMatchName, &structs.IntentionQueryRequest{ + err := s.dataSources.Intentions.Notify(ctx, &structs.IntentionQueryRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, Match: &structs.IntentionQueryMatch{ @@ -171,7 +171,7 @@ func (s *handlerTerminatingGateway) handleUpdate(ctx context.Context, u UpdateEv // This cert is used to terminate mTLS connections on the service's behalf if _, ok := snap.TerminatingGateway.WatchedLeaves[svc.Service]; !ok { ctx, cancel := context.WithCancel(ctx) - err := s.cache.Notify(ctx, cachetype.ConnectCALeafName, &cachetype.ConnectCALeafRequest{ + err := s.dataSources.LeafCertificate.Notify(ctx, &cachetype.ConnectCALeafRequest{ Datacenter: s.source.Datacenter, Token: s.token, Service: svc.Service.Name, @@ -193,7 +193,7 @@ func (s *handlerTerminatingGateway) handleUpdate(ctx context.Context, u UpdateEv // These are used to determine the protocol for the target service. if _, ok := snap.TerminatingGateway.WatchedConfigs[svc.Service]; !ok { ctx, cancel := context.WithCancel(ctx) - err := s.cache.Notify(ctx, cachetype.ResolvedServiceConfigName, &structs.ServiceConfigRequest{ + err := s.dataSources.ResolvedServiceConfig.Notify(ctx, &structs.ServiceConfigRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, Name: svc.Service.Name, @@ -215,7 +215,7 @@ func (s *handlerTerminatingGateway) handleUpdate(ctx context.Context, u UpdateEv // These are used to create clusters and endpoints for the service subsets if _, ok := snap.TerminatingGateway.WatchedResolvers[svc.Service]; !ok { ctx, cancel := context.WithCancel(ctx) - err := s.cache.Notify(ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{ + err := s.dataSources.ConfigEntry.Notify(ctx, &structs.ConfigEntryQuery{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, Kind: structs.ServiceResolver, diff --git a/agent/proxycfg/testing.go b/agent/proxycfg/testing.go index 73320108d5..1edbfd0a3e 100644 --- a/agent/proxycfg/testing.go +++ b/agent/proxycfg/testing.go @@ -22,53 +22,6 @@ import ( "github.com/hashicorp/consul/api" ) -// TestCacheTypes encapsulates all the different cache types proxycfg.State will -// watch/request for controlling one during testing. -type TestCacheTypes struct { - roots *ControllableCacheType - leaf *ControllableCacheType - intentions *ControllableCacheType - health *ControllableCacheType - query *ControllableCacheType - compiledChain *ControllableCacheType - serviceHTTPChecks *ControllableCacheType - configEntry *ControllableCacheType -} - -// NewTestCacheTypes creates a set of ControllableCacheTypes for all types that -// proxycfg will watch suitable for testing a proxycfg.State or Manager. -func NewTestCacheTypes(t testing.T) *TestCacheTypes { - t.Helper() - ct := &TestCacheTypes{ - roots: NewControllableCacheType(t), - leaf: NewControllableCacheType(t), - intentions: NewControllableCacheType(t), - health: NewControllableCacheType(t), - query: NewControllableCacheType(t), - compiledChain: NewControllableCacheType(t), - serviceHTTPChecks: NewControllableCacheType(t), - configEntry: NewControllableCacheType(t), - } - ct.query.blocking = false - return ct -} - -// TestCacheWithTypes registers ControllableCacheTypes for all types that -// proxycfg will watch suitable for testing a proxycfg.State or Manager. -func TestCacheWithTypes(t testing.T, types *TestCacheTypes) *cache.Cache { - c := cache.New(cache.Options{}) - c.RegisterType(cachetype.ConnectCARootName, types.roots) - c.RegisterType(cachetype.ConnectCALeafName, types.leaf) - c.RegisterType(cachetype.IntentionMatchName, types.intentions) - c.RegisterType(cachetype.HealthServicesName, types.health) - c.RegisterType(cachetype.PreparedQueryName, types.query) - c.RegisterType(cachetype.CompiledDiscoveryChainName, types.compiledChain) - c.RegisterType(cachetype.ServiceHTTPChecksName, types.serviceHTTPChecks) - c.RegisterType(cachetype.ConfigEntryName, types.configEntry) - - return c -} - // TestCerts generates a CA and Leaf suitable for returning as mock CA // root/leaf cache requests. func TestCerts(t testing.T) (*structs.IndexedCARoots, *structs.IssuedCert) { @@ -668,19 +621,9 @@ func TestGatewayServiceGroupFooDC1(t testing.T) structs.CheckServiceNodes { } } -type noopCacheNotifier struct{} +type noopDataSource[ReqType any] struct{} -var _ CacheNotifier = (*noopCacheNotifier)(nil) - -func (*noopCacheNotifier) Notify(_ context.Context, _ string, _ cache.Request, _ string, _ chan<- UpdateEvent) error { - return nil -} - -type noopHealth struct{} - -var _ Health = (*noopHealth)(nil) - -func (*noopHealth) Notify(_ context.Context, _ structs.ServiceSpecificRequest, _ string, _ chan<- UpdateEvent) error { +func (*noopDataSource[ReqType]) Notify(context.Context, ReqType, string, chan<- UpdateEvent) error { return nil } @@ -711,8 +654,24 @@ func testConfigSnapshotFixture( source: &structs.QuerySource{ Datacenter: "dc1", }, - cache: &noopCacheNotifier{}, - health: &noopHealth{}, + dataSources: DataSources{ + CARoots: &noopDataSource[*structs.DCSpecificRequest]{}, + CompiledDiscoveryChain: &noopDataSource[*structs.DiscoveryChainRequest]{}, + ConfigEntry: &noopDataSource[*structs.ConfigEntryQuery]{}, + ConfigEntryList: &noopDataSource[*structs.ConfigEntryQuery]{}, + Datacenters: &noopDataSource[*structs.DatacentersRequest]{}, + FederationStateListMeshGateways: &noopDataSource[*structs.DCSpecificRequest]{}, + GatewayServices: &noopDataSource[*structs.ServiceSpecificRequest]{}, + Health: &noopDataSource[*structs.ServiceSpecificRequest]{}, + HTTPChecks: &noopDataSource[*cachetype.ServiceHTTPChecksRequest]{}, + Intentions: &noopDataSource[*structs.IntentionQueryRequest]{}, + IntentionUpstreams: &noopDataSource[*structs.ServiceSpecificRequest]{}, + InternalServiceDump: &noopDataSource[*structs.ServiceDumpRequest]{}, + LeafCertificate: &noopDataSource[*cachetype.ConnectCALeafRequest]{}, + PreparedQuery: &noopDataSource[*structs.PreparedQueryExecuteRequest]{}, + ResolvedServiceConfig: &noopDataSource[*structs.ServiceConfigRequest]{}, + ServiceList: &noopDataSource[*structs.DCSpecificRequest]{}, + }, dnsConfig: DNSConfig{ // TODO: make configurable Domain: "consul", AltDomain: "", @@ -720,6 +679,7 @@ func testConfigSnapshotFixture( serverSNIFn: serverSNIFn, intentionDefaultAllow: false, // TODO: make configurable } + testConfigSnapshotFixtureEnterprise(&config) s, err := newServiceInstanceFromNodeService(ProxyID{ServiceID: ns.CompoundServiceID()}, ns, token) if err != nil { t.Fatalf("err: %v", err) @@ -889,3 +849,159 @@ func projectRoot() string { _, base, _, _ := runtime.Caller(0) return filepath.Dir(base) } + +// NewTestDataSources creates a set of data sources that can be used to provide +// the Manager with data in tests. +func NewTestDataSources() *TestDataSources { + srcs := &TestDataSources{ + CARoots: NewTestDataSource[*structs.DCSpecificRequest, *structs.IndexedCARoots](), + CompiledDiscoveryChain: NewTestDataSource[*structs.DiscoveryChainRequest, *structs.DiscoveryChainResponse](), + ConfigEntry: NewTestDataSource[*structs.ConfigEntryQuery, *structs.ConfigEntryResponse](), + ConfigEntryList: NewTestDataSource[*structs.ConfigEntryQuery, *structs.IndexedConfigEntries](), + Datacenters: NewTestDataSource[*structs.DatacentersRequest, *[]string](), + FederationStateListMeshGateways: NewTestDataSource[*structs.DCSpecificRequest, *structs.DatacenterIndexedCheckServiceNodes](), + GatewayServices: NewTestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedGatewayServices](), + Health: NewTestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedCheckServiceNodes](), + HTTPChecks: NewTestDataSource[*cachetype.ServiceHTTPChecksRequest, []structs.CheckType](), + Intentions: NewTestDataSource[*structs.IntentionQueryRequest, *structs.IndexedIntentionMatches](), + IntentionUpstreams: NewTestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedServiceList](), + InternalServiceDump: NewTestDataSource[*structs.ServiceDumpRequest, *structs.IndexedNodesWithGateways](), + LeafCertificate: NewTestDataSource[*cachetype.ConnectCALeafRequest, *structs.IssuedCert](), + PreparedQuery: NewTestDataSource[*structs.PreparedQueryExecuteRequest, *structs.PreparedQueryExecuteResponse](), + ResolvedServiceConfig: NewTestDataSource[*structs.ServiceConfigRequest, *structs.ServiceConfigResponse](), + ServiceList: NewTestDataSource[*structs.DCSpecificRequest, *structs.IndexedServiceList](), + } + srcs.buildEnterpriseSources() + return srcs +} + +type TestDataSources struct { + CARoots *TestDataSource[*structs.DCSpecificRequest, *structs.IndexedCARoots] + CompiledDiscoveryChain *TestDataSource[*structs.DiscoveryChainRequest, *structs.DiscoveryChainResponse] + ConfigEntry *TestDataSource[*structs.ConfigEntryQuery, *structs.ConfigEntryResponse] + ConfigEntryList *TestDataSource[*structs.ConfigEntryQuery, *structs.IndexedConfigEntries] + FederationStateListMeshGateways *TestDataSource[*structs.DCSpecificRequest, *structs.DatacenterIndexedCheckServiceNodes] + Datacenters *TestDataSource[*structs.DatacentersRequest, *[]string] + GatewayServices *TestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedGatewayServices] + Health *TestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedCheckServiceNodes] + HTTPChecks *TestDataSource[*cachetype.ServiceHTTPChecksRequest, []structs.CheckType] + Intentions *TestDataSource[*structs.IntentionQueryRequest, *structs.IndexedIntentionMatches] + IntentionUpstreams *TestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedServiceList] + InternalServiceDump *TestDataSource[*structs.ServiceDumpRequest, *structs.IndexedNodesWithGateways] + LeafCertificate *TestDataSource[*cachetype.ConnectCALeafRequest, *structs.IssuedCert] + PreparedQuery *TestDataSource[*structs.PreparedQueryExecuteRequest, *structs.PreparedQueryExecuteResponse] + ResolvedServiceConfig *TestDataSource[*structs.ServiceConfigRequest, *structs.ServiceConfigResponse] + ServiceList *TestDataSource[*structs.DCSpecificRequest, *structs.IndexedServiceList] + + TestDataSourcesEnterprise +} + +func (t *TestDataSources) ToDataSources() DataSources { + ds := DataSources{ + CARoots: t.CARoots, + CompiledDiscoveryChain: t.CompiledDiscoveryChain, + ConfigEntry: t.ConfigEntry, + ConfigEntryList: t.ConfigEntryList, + Datacenters: t.Datacenters, + GatewayServices: t.GatewayServices, + Health: t.Health, + HTTPChecks: t.HTTPChecks, + Intentions: t.Intentions, + IntentionUpstreams: t.IntentionUpstreams, + InternalServiceDump: t.InternalServiceDump, + LeafCertificate: t.LeafCertificate, + PreparedQuery: t.PreparedQuery, + ResolvedServiceConfig: t.ResolvedServiceConfig, + ServiceList: t.ServiceList, + } + t.fillEnterpriseDataSources(&ds) + return ds +} + +// NewTestDataSource creates a test data source that accepts requests to Notify +// of type RequestType and dispatches UpdateEvents with a result of type ValType. +// +// TODO(agentless): we still depend on cache.Request here because it provides the +// CacheInfo method used for hashing the request - this won't work when we extract +// this package into a shared library. +func NewTestDataSource[ReqType cache.Request, ValType any]() *TestDataSource[ReqType, ValType] { + return &TestDataSource[ReqType, ValType]{ + data: make(map[string]ValType), + trigger: make(chan struct{}), + } +} + +type TestDataSource[ReqType cache.Request, ValType any] struct { + mu sync.Mutex + data map[string]ValType + lastReq ReqType + + // Note: trigger is currently global for all requests of the given type, so + // Manager may receive duplicate events - as the dispatch goroutine will be + // woken up whenever *any* requested data changes. + trigger chan struct{} +} + +// Notify satisfies the interfaces used by Manager to subscribe to data. +func (t *TestDataSource[ReqType, ValType]) Notify(ctx context.Context, req ReqType, correlationID string, ch chan<- UpdateEvent) error { + t.mu.Lock() + t.lastReq = req + t.mu.Unlock() + + go t.dispatch(ctx, correlationID, t.reqKey(req), ch) + + return nil +} + +func (t *TestDataSource[ReqType, ValType]) dispatch(ctx context.Context, correlationID, key string, ch chan<- UpdateEvent) { + for { + t.mu.Lock() + val, ok := t.data[key] + trigger := t.trigger + t.mu.Unlock() + + if ok { + event := UpdateEvent{ + CorrelationID: correlationID, + Result: val, + } + + select { + case ch <- event: + case <-ctx.Done(): + } + } + + select { + case <-trigger: + case <-ctx.Done(): + return + } + } +} + +func (t *TestDataSource[ReqType, ValType]) reqKey(req ReqType) string { + return req.CacheInfo().Key +} + +// Set broadcasts the given value to consumers that subscribed with the given +// request. +func (t *TestDataSource[ReqType, ValType]) Set(req ReqType, val ValType) error { + t.mu.Lock() + t.data[t.reqKey(req)] = val + oldTrigger := t.trigger + t.trigger = make(chan struct{}) + t.mu.Unlock() + + close(oldTrigger) + + return nil +} + +// LastReq returns the request from the last call to Notify that was received. +func (t *TestDataSource[ReqType, ValType]) LastReq() ReqType { + t.mu.Lock() + defer t.mu.Unlock() + + return t.lastReq +} diff --git a/agent/proxycfg/testing_oss.go b/agent/proxycfg/testing_oss.go new file mode 100644 index 0000000000..11f9a6adeb --- /dev/null +++ b/agent/proxycfg/testing_oss.go @@ -0,0 +1,12 @@ +//go:build !consulent +// +build !consulent + +package proxycfg + +type TestDataSourcesEnterprise struct{} + +func (*TestDataSources) buildEnterpriseSources() {} + +func (*TestDataSources) fillEnterpriseDataSources(*DataSources) {} + +func testConfigSnapshotFixtureEnterprise(*stateConfig) {} diff --git a/agent/proxycfg/upstreams.go b/agent/proxycfg/upstreams.go index 1ed36c0e41..3e887caf1c 100644 --- a/agent/proxycfg/upstreams.go +++ b/agent/proxycfg/upstreams.go @@ -9,7 +9,6 @@ import ( "github.com/mitchellh/mapstructure" "github.com/hashicorp/consul/acl" - cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/structs" ) @@ -314,12 +313,12 @@ func (s *handlerUpstreams) resetWatchesFromChain( ctx, cancel := context.WithCancel(ctx) opts := gatewayWatchOpts{ - notifier: s.cache, - notifyCh: s.ch, - source: *s.source, - token: s.token, - key: gwKey, - upstreamID: uid, + internalServiceDump: s.dataSources.InternalServiceDump, + notifyCh: s.ch, + source: *s.source, + token: s.token, + key: gwKey, + upstreamID: uid, } err := watchMeshGateway(ctx, opts) if err != nil { @@ -372,7 +371,7 @@ func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *Config correlationID := "upstream-target:" + opts.chainID + ":" + opts.upstreamID.String() ctx, cancel := context.WithCancel(ctx) - err := s.health.Notify(ctx, structs.ServiceSpecificRequest{ + err := s.dataSources.Health.Notify(ctx, &structs.ServiceSpecificRequest{ PeerName: opts.upstreamID.Peer, Datacenter: opts.datacenter, QueryOptions: structs.QueryOptions{ @@ -413,7 +412,7 @@ func (s *handlerUpstreams) watchDiscoveryChain(ctx context.Context, snap *Config } ctx, cancel := context.WithCancel(ctx) - err := s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{ + err := s.dataSources.CompiledDiscoveryChain.Notify(ctx, &structs.DiscoveryChainRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, Name: opts.name,