proxycfg: replace direct agent cache usage with interfaces (#13320)

This is the OSS portion of enterprise PRs 1904, 1905, 1906, 1907, 1949,
and 1971.

It replaces the proxycfg manager's direct dependency on the agent cache
with interfaces that will be implemented differently when serving xDS
sessions from a Consul server.
This commit is contained in:
Dan Upton 2022-06-01 16:18:06 +01:00 committed by GitHub
parent 67860bd248
commit adeabed126
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 787 additions and 469 deletions

View File

@ -20,7 +20,6 @@ import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/go-connlimit"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
@ -41,6 +40,7 @@ import (
publicgrpc "github.com/hashicorp/consul/agent/grpc/public"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
proxycfgglue "github.com/hashicorp/consul/agent/proxycfg-glue"
catalogproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/catalog"
localproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/local"
"github.com/hashicorp/consul/agent/rpcclient/health"
@ -56,6 +56,7 @@ import (
"github.com/hashicorp/consul/lib/mutex"
"github.com/hashicorp/consul/lib/routine"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
)
@ -627,10 +628,28 @@ func (a *Agent) Start(ctx context.Context) error {
go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})
// Start the proxy config manager.
proxyDataSources := proxycfg.DataSources{
CARoots: proxycfgglue.CacheCARoots(a.cache),
CompiledDiscoveryChain: proxycfgglue.CacheCompiledDiscoveryChain(a.cache),
ConfigEntry: proxycfgglue.CacheConfigEntry(a.cache),
ConfigEntryList: proxycfgglue.CacheConfigEntryList(a.cache),
Datacenters: proxycfgglue.CacheDatacenters(a.cache),
FederationStateListMeshGateways: proxycfgglue.CacheFederationStateListMeshGateways(a.cache),
GatewayServices: proxycfgglue.CacheGatewayServices(a.cache),
Health: proxycfgglue.Health(a.rpcClientHealth),
HTTPChecks: proxycfgglue.CacheHTTPChecks(a.cache),
Intentions: proxycfgglue.CacheIntentions(a.cache),
IntentionUpstreams: proxycfgglue.CacheIntentionUpstreams(a.cache),
InternalServiceDump: proxycfgglue.CacheInternalServiceDump(a.cache),
LeafCertificate: proxycfgglue.CacheLeafCertificate(a.cache),
PreparedQuery: proxycfgglue.CachePrepraredQuery(a.cache),
ResolvedServiceConfig: proxycfgglue.CacheResolvedServiceConfig(a.cache),
ServiceList: proxycfgglue.CacheServiceList(a.cache),
}
a.fillEnterpriseProxyDataSources(&proxyDataSources)
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
Cache: &proxycfg.CacheWrapper{Cache: a.cache},
Health: &proxycfg.HealthWrapper{Health: a.rpcClientHealth},
Logger: a.logger.Named(logging.ProxyConfig),
DataSources: proxyDataSources,
Logger: a.logger.Named(logging.ProxyConfig),
Source: &structs.QuerySource{
Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName,

View File

@ -9,6 +9,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
)
@ -58,3 +59,5 @@ func (a *Agent) AgentEnterpriseMeta() *acl.EnterpriseMeta {
}
func (a *Agent) registerEntCache() {}
func (*Agent) fillEnterpriseProxyDataSources(*proxycfg.DataSources) {}

153
agent/proxycfg-glue/glue.go Normal file
View File

@ -0,0 +1,153 @@
package proxycfgglue
import (
"context"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/rpcclient/health"
"github.com/hashicorp/consul/agent/structs"
)
// CacheCARoots satisfies the proxycfg.CARoots interface by sourcing data from
// the agent cache.
func CacheCARoots(c *cache.Cache) proxycfg.CARoots {
return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.ConnectCARootName}
}
// CacheCompiledDiscoveryChain satisfies the proxycfg.CompiledDiscoveryChain
// interface by sourcing data from the agent cache.
func CacheCompiledDiscoveryChain(c *cache.Cache) proxycfg.CompiledDiscoveryChain {
return &cacheProxyDataSource[*structs.DiscoveryChainRequest]{c, cachetype.CompiledDiscoveryChainName}
}
// CacheConfigEntry satisfies the proxycfg.ConfigEntry interface by sourcing
// data from the agent cache.
func CacheConfigEntry(c *cache.Cache) proxycfg.ConfigEntry {
return &cacheProxyDataSource[*structs.ConfigEntryQuery]{c, cachetype.ConfigEntryName}
}
// CacheConfigEntryList satisfies the proxycfg.ConfigEntryList interface by
// sourcing data from the agent cache.
func CacheConfigEntryList(c *cache.Cache) proxycfg.ConfigEntryList {
return &cacheProxyDataSource[*structs.ConfigEntryQuery]{c, cachetype.ConfigEntryListName}
}
// CacheDatacenters satisfies the proxycfg.Datacenters interface by sourcing
// data from the agent cache.
func CacheDatacenters(c *cache.Cache) proxycfg.Datacenters {
return &cacheProxyDataSource[*structs.DatacentersRequest]{c, cachetype.CatalogDatacentersName}
}
// CacheFederationStateListMeshGateways satisfies the proxycfg.FederationStateListMeshGateways
// interface by sourcing data from the agent cache.
func CacheFederationStateListMeshGateways(c *cache.Cache) proxycfg.FederationStateListMeshGateways {
return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.FederationStateListMeshGatewaysName}
}
// CacheGatewayServices satisfies the proxycfg.GatewayServices interface by
// sourcing data from the agent cache.
func CacheGatewayServices(c *cache.Cache) proxycfg.GatewayServices {
return &cacheProxyDataSource[*structs.ServiceSpecificRequest]{c, cachetype.GatewayServicesName}
}
// CacheHTTPChecks satisifies the proxycfg.HTTPChecks interface by sourcing
// data from the agent cache.
func CacheHTTPChecks(c *cache.Cache) proxycfg.HTTPChecks {
return &cacheProxyDataSource[*cachetype.ServiceHTTPChecksRequest]{c, cachetype.ServiceHTTPChecksName}
}
// CacheIntentions satisfies the proxycfg.Intentions interface by sourcing data
// from the agent cache.
func CacheIntentions(c *cache.Cache) proxycfg.Intentions {
return &cacheProxyDataSource[*structs.IntentionQueryRequest]{c, cachetype.IntentionMatchName}
}
// CacheIntentionUpstreams satisfies the proxycfg.IntentionUpstreams interface
// by sourcing data from the agent cache.
func CacheIntentionUpstreams(c *cache.Cache) proxycfg.IntentionUpstreams {
return &cacheProxyDataSource[*structs.ServiceSpecificRequest]{c, cachetype.IntentionUpstreamsName}
}
// CacheInternalServiceDump satisfies the proxycfg.InternalServiceDump
// interface by sourcing data from the agent cache.
func CacheInternalServiceDump(c *cache.Cache) proxycfg.InternalServiceDump {
return &cacheProxyDataSource[*structs.ServiceDumpRequest]{c, cachetype.InternalServiceDumpName}
}
// CacheLeafCertificate satisifies the proxycfg.LeafCertificate interface by
// sourcing data from the agent cache.
func CacheLeafCertificate(c *cache.Cache) proxycfg.LeafCertificate {
return &cacheProxyDataSource[*cachetype.ConnectCALeafRequest]{c, cachetype.ConnectCALeafName}
}
// CachePrepraredQuery satisfies the proxycfg.PreparedQuery interface by
// sourcing data from the agent cache.
func CachePrepraredQuery(c *cache.Cache) proxycfg.PreparedQuery {
return &cacheProxyDataSource[*structs.PreparedQueryExecuteRequest]{c, cachetype.PreparedQueryName}
}
// CacheResolvedServiceConfig satisfies the proxycfg.ResolvedServiceConfig
// interface by sourcing data from the agent cache.
func CacheResolvedServiceConfig(c *cache.Cache) proxycfg.ResolvedServiceConfig {
return &cacheProxyDataSource[*structs.ServiceConfigRequest]{c, cachetype.ResolvedServiceConfigName}
}
// CacheServiceList satisfies the proxycfg.ServiceList interface by sourcing
// data from the agent cache.
func CacheServiceList(c *cache.Cache) proxycfg.ServiceList {
return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.CatalogServiceListName}
}
// cacheProxyDataSource implements a generic wrapper around the agent cache to
// provide data to the proxycfg.Manager.
type cacheProxyDataSource[ReqType cache.Request] struct {
c *cache.Cache
t string
}
// Notify satisfies the interfaces used by proxycfg.Manager to source data by
// subscribing to notifications from the agent cache.
func (c *cacheProxyDataSource[ReqType]) Notify(
ctx context.Context,
req ReqType,
correlationID string,
ch chan<- proxycfg.UpdateEvent,
) error {
return c.c.NotifyCallback(ctx, c.t, req, correlationID, dispatchCacheUpdate(ctx, ch))
}
// Health wraps health.Client so that the proxycfg package doesn't need to
// reference cache.UpdateEvent directly.
func Health(client *health.Client) proxycfg.Health {
return &healthWrapper{client}
}
type healthWrapper struct {
client *health.Client
}
func (h *healthWrapper) Notify(
ctx context.Context,
req *structs.ServiceSpecificRequest,
correlationID string,
ch chan<- proxycfg.UpdateEvent,
) error {
return h.client.Notify(ctx, *req, correlationID, dispatchCacheUpdate(ctx, ch))
}
func dispatchCacheUpdate(ctx context.Context, ch chan<- proxycfg.UpdateEvent) cache.Callback {
return func(ctx context.Context, e cache.UpdateEvent) {
u := proxycfg.UpdateEvent{
CorrelationID: e.CorrelationID,
Result: e.Result,
Err: e.Err,
}
select {
case ch <- u:
case <-ctx.Done():
}
}
}

View File

@ -32,7 +32,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
snap.ConnectProxy.PassthroughIndices = make(map[string]indexedTarget)
// Watch for root changes
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
err := s.dataSources.CARoots.Notify(ctx, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Source: *s.source,
@ -42,7 +42,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
}
// Watch the leaf cert
err = s.cache.Notify(ctx, cachetype.ConnectCALeafName, &cachetype.ConnectCALeafRequest{
err = s.dataSources.LeafCertificate.Notify(ctx, &cachetype.ConnectCALeafRequest{
Datacenter: s.source.Datacenter,
Token: s.token,
Service: s.proxyCfg.DestinationServiceName,
@ -53,7 +53,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
}
// Watch for intention updates
err = s.cache.Notify(ctx, cachetype.IntentionMatchName, &structs.IntentionQueryRequest{
err = s.dataSources.Intentions.Notify(ctx, &structs.IntentionQueryRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Match: &structs.IntentionQueryMatch{
@ -72,7 +72,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
}
// Get information about the entire service mesh.
err = s.cache.Notify(ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{
err = s.dataSources.ConfigEntry.Notify(ctx, &structs.ConfigEntryQuery{
Kind: structs.MeshConfig,
Name: structs.MeshConfigMesh,
Datacenter: s.source.Datacenter,
@ -84,7 +84,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
}
// Watch for service check updates
err = s.cache.Notify(ctx, cachetype.ServiceHTTPChecksName, &cachetype.ServiceHTTPChecksRequest{
err = s.dataSources.HTTPChecks.Notify(ctx, &cachetype.ServiceHTTPChecksRequest{
ServiceID: s.proxyCfg.DestinationServiceID,
EnterpriseMeta: s.proxyID.EnterpriseMeta,
}, svcChecksWatchIDPrefix+structs.ServiceIDString(s.proxyCfg.DestinationServiceID, &s.proxyID.EnterpriseMeta), s.ch)
@ -94,7 +94,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
if s.proxyCfg.Mode == structs.ProxyModeTransparent {
// When in transparent proxy we will infer upstreams from intentions with this source
err := s.cache.Notify(ctx, cachetype.IntentionUpstreamsName, &structs.ServiceSpecificRequest{
err := s.dataSources.IntentionUpstreams.Notify(ctx, &structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: s.proxyCfg.DestinationServiceName,
@ -156,7 +156,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
switch u.DestinationType {
case structs.UpstreamDestTypePreparedQuery:
err = s.cache.Notify(ctx, cachetype.PreparedQueryName, &structs.PreparedQueryExecuteRequest{
err = s.dataSources.PreparedQuery.Notify(ctx, &structs.PreparedQueryExecuteRequest{
Datacenter: dc,
QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: defaultPreparedQueryPollInterval},
QueryIDOrName: u.DestinationName,
@ -196,7 +196,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
continue
}
err = s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
err = s.dataSources.CompiledDiscoveryChain.Notify(ctx, &structs.DiscoveryChainRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Name: u.DestinationName,

View File

@ -0,0 +1,162 @@
package proxycfg
import (
"context"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs"
)
// UpdateEvent contains new data for a resource we are subscribed to (e.g. an
// agent cache entry).
type UpdateEvent struct {
CorrelationID string
Result interface{}
Err error
}
// DataSources contains the dependencies used to consume data used to configure
// proxies.
type DataSources struct {
// CARoots provides updates about the CA root certificates on a notification
// channel.
CARoots CARoots
// CompiledDiscoveryChain provides updates about a service's discovery chain
// on a notification channel.
CompiledDiscoveryChain CompiledDiscoveryChain
// ConfigEntry provides updates about a single config entry on a notification
// channel.
ConfigEntry ConfigEntry
// ConfigEntryList provides updates about a list of config entries on a
// notification channel.
ConfigEntryList ConfigEntryList
// Datacenters provides updates about federated datacenters on a notification
// channel.
Datacenters Datacenters
// FederationStateListMeshGateways is the interface used to consume updates
// about mesh gateways from the federation state.
FederationStateListMeshGateways FederationStateListMeshGateways
// GatewayServices provides updates about a gateway's upstream services on a
// notification channel.
GatewayServices GatewayServices
// Health provides service health updates on a notification channel.
Health Health
// HTTPChecks provides updates about a service's HTTP and gRPC checks on a
// notification channel.
HTTPChecks HTTPChecks
// Intentions provides intention updates on a notification channel.
Intentions Intentions
// IntentionUpstreams provides intention-inferred upstream updates on a
// notification channel.
IntentionUpstreams IntentionUpstreams
// InternalServiceDump provides updates about a (gateway) service on a
// notification channel.
InternalServiceDump InternalServiceDump
// LeafCertificate provides updates about the service's leaf certificate on a
// notification channel.
LeafCertificate LeafCertificate
// PreparedQuery provides updates about the results of a prepared query.
PreparedQuery PreparedQuery
// ResolvedServiceConfig provides updates about a service's resolved config.
ResolvedServiceConfig ResolvedServiceConfig
// ServiceList provides updates about the list of all services in a datacenter
// on a notification channel.
ServiceList ServiceList
DataSourcesEnterprise
}
// CARoots is the interface used to consume updates about the CA root
// certificates.
type CARoots interface {
Notify(ctx context.Context, req *structs.DCSpecificRequest, correlationID string, ch chan<- UpdateEvent) error
}
// CompiledDiscoveryChain is the interface used to consume updates about the
// compiled discovery chain for a service.
type CompiledDiscoveryChain interface {
Notify(ctx context.Context, req *structs.DiscoveryChainRequest, correlationID string, ch chan<- UpdateEvent) error
}
// ConfigEntry is the interface used to consume updates about a single config
// entry.
type ConfigEntry interface {
Notify(ctx context.Context, req *structs.ConfigEntryQuery, correlationID string, ch chan<- UpdateEvent) error
}
// ConfigEntry is the interface used to consume updates about a list of config
// entries.
type ConfigEntryList interface {
Notify(ctx context.Context, req *structs.ConfigEntryQuery, correlationID string, ch chan<- UpdateEvent) error
}
// Datacenters is the interface used to consume updates about federated
// datacenters.
type Datacenters interface {
Notify(ctx context.Context, req *structs.DatacentersRequest, correlationID string, ch chan<- UpdateEvent) error
}
// FederationStateListMeshGateways is the interface used to consume updates
// about mesh gateways from the federation state.
type FederationStateListMeshGateways interface {
Notify(ctx context.Context, req *structs.DCSpecificRequest, correlationID string, ch chan<- UpdateEvent) error
}
// GatewayServices is the interface used to consume updates about a gateway's
// upstream services.
type GatewayServices interface {
Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- UpdateEvent) error
}
// Health is the interface used to consume service health updates.
type Health interface {
Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- UpdateEvent) error
}
// HTTPChecks is the interface used to consume updates about a service's HTTP
// and gRPC-based checks (in order to determine which paths to expose through
// the proxy).
type HTTPChecks interface {
Notify(ctx context.Context, req *cachetype.ServiceHTTPChecksRequest, correlationID string, ch chan<- UpdateEvent) error
}
// Intentions is the interface used to consume intention updates.
type Intentions interface {
Notify(ctx context.Context, req *structs.IntentionQueryRequest, correlationID string, ch chan<- UpdateEvent) error
}
// IntentionUpstreams is the interface used to consume updates about upstreams
// inferred from service intentions.
type IntentionUpstreams interface {
Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- UpdateEvent) error
}
// InternalServiceDump is the interface used to consume updates about a (gateway)
// service via the internal ServiceDump RPC.
type InternalServiceDump interface {
Notify(ctx context.Context, req *structs.ServiceDumpRequest, correlationID string, ch chan<- UpdateEvent) error
}
// LeafCertificate is the interface used to consume updates about a service's
// leaf certificate.
type LeafCertificate interface {
Notify(ctx context.Context, req *cachetype.ConnectCALeafRequest, correlationID string, ch chan<- UpdateEvent) error
}
// PreparedQuery is the interface used to consume updates about the results of
// a prepared query.
type PreparedQuery interface {
Notify(ctx context.Context, req *structs.PreparedQueryExecuteRequest, correlationID string, ch chan<- UpdateEvent) error
}
// ResolvedServiceConfig is the interface used to consume updates about a
// service's resolved config.
type ResolvedServiceConfig interface {
Notify(ctx context.Context, req *structs.ServiceConfigRequest, correlationID string, ch chan<- UpdateEvent) error
}
// ServiceList is the interface used to consume updates about the list of
// all services in a datacenter.
type ServiceList interface {
Notify(ctx context.Context, req *structs.DCSpecificRequest, correlationID string, ch chan<- UpdateEvent) error
}

View File

@ -0,0 +1,6 @@
//go:build !consulent
// +build !consulent
package proxycfg
type DataSourcesEnterprise struct{}

View File

@ -1,60 +0,0 @@
// TODO(agentless): these glue types belong in the agent package, but moving
// them is a little tricky because the proxycfg tests use them. It should be
// easier to break apart once we no longer depend on cache.Notify directly.
package proxycfg
import (
"context"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/rpcclient/health"
"github.com/hashicorp/consul/agent/structs"
)
// HealthWrapper wraps health.Client so that the rest of the proxycfg package
// doesn't need to reference cache.UpdateEvent (it will be extracted into a
// shared library in the future).
type HealthWrapper struct {
Health *health.Client
}
func (w *HealthWrapper) Notify(
ctx context.Context,
req structs.ServiceSpecificRequest,
correlationID string,
ch chan<- UpdateEvent,
) error {
return w.Health.Notify(ctx, req, correlationID, dispatchCacheUpdate(ctx, ch))
}
// CacheWrapper wraps cache.Cache so that the rest of the proxycfg package
// doesn't need to reference cache.UpdateEvent (it will be extracted into a
// shared library in the future).
type CacheWrapper struct {
Cache *cache.Cache
}
func (w *CacheWrapper) Notify(
ctx context.Context,
t string,
req cache.Request,
correlationID string,
ch chan<- UpdateEvent,
) error {
return w.Cache.NotifyCallback(ctx, t, req, correlationID, dispatchCacheUpdate(ctx, ch))
}
func dispatchCacheUpdate(ctx context.Context, ch chan<- UpdateEvent) cache.Callback {
return func(ctx context.Context, e cache.UpdateEvent) {
u := UpdateEvent{
CorrelationID: e.CorrelationID,
Result: e.Result,
Err: e.Err,
}
select {
case ch <- u:
case <-ctx.Done():
}
}
}

View File

@ -15,7 +15,7 @@ type handlerIngressGateway struct {
func (s *handlerIngressGateway) initialize(ctx context.Context) (ConfigSnapshot, error) {
snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig)
// Watch for root changes
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
err := s.dataSources.CARoots.Notify(ctx, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Source: *s.source,
@ -25,7 +25,7 @@ func (s *handlerIngressGateway) initialize(ctx context.Context) (ConfigSnapshot,
}
// Get information about the entire service mesh.
err = s.cache.Notify(ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{
err = s.dataSources.ConfigEntry.Notify(ctx, &structs.ConfigEntryQuery{
Kind: structs.MeshConfig,
Name: structs.MeshConfigMesh,
Datacenter: s.source.Datacenter,
@ -37,7 +37,7 @@ func (s *handlerIngressGateway) initialize(ctx context.Context) (ConfigSnapshot,
}
// Watch this ingress gateway's config entry
err = s.cache.Notify(ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{
err = s.dataSources.ConfigEntry.Notify(ctx, &structs.ConfigEntryQuery{
Kind: structs.IngressGateway,
Name: s.service,
Datacenter: s.source.Datacenter,
@ -49,7 +49,7 @@ func (s *handlerIngressGateway) initialize(ctx context.Context) (ConfigSnapshot,
}
// Watch the ingress-gateway's list of upstreams
err = s.cache.Notify(ctx, cachetype.GatewayServicesName, &structs.ServiceSpecificRequest{
err = s.dataSources.GatewayServices.Notify(ctx, &structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: s.service,
@ -195,7 +195,7 @@ func (s *handlerIngressGateway) watchIngressLeafCert(ctx context.Context, snap *
snap.IngressGateway.LeafCertWatchCancel()
}
ctx, cancel := context.WithCancel(ctx)
err := s.cache.Notify(ctx, cachetype.ConnectCALeafName, &cachetype.ConnectCALeafRequest{
err := s.dataSources.LeafCertificate.Notify(ctx, &cachetype.ConnectCALeafRequest{
Datacenter: s.source.Datacenter,
Token: s.token,
Service: s.service,

View File

@ -57,11 +57,9 @@ type Manager struct {
// panic. The ManagerConfig is passed by value to NewManager so the passed value
// can be mutated safely.
type ManagerConfig struct {
// Cache is the agent's cache instance that can be used to retrieve, store and
// monitor state for the proxies.
Cache CacheNotifier
// Health provides service health updates on a notification channel.
Health Health
// DataSources contains the dependencies used to consume data used to configure
// proxies.
DataSources DataSources
// source describes the current agent's identity, it's used directly for
// prepared query discovery but also indirectly as a way to pass current
// Datacenter name into other request types that need it. This is sufficient
@ -81,7 +79,7 @@ type ManagerConfig struct {
// NewManager constructs a Manager.
func NewManager(cfg ManagerConfig) (*Manager, error) {
if cfg.Cache == nil || cfg.Source == nil || cfg.Logger == nil {
if cfg.Source == nil || cfg.Logger == nil {
return nil, errors.New("all ManagerConfig fields must be provided")
}
m := &Manager{
@ -135,8 +133,7 @@ func (m *Manager) Register(id ProxyID, ns *structs.NodeService, source ProxySour
// TODO: move to a function that translates ManagerConfig->stateConfig
stateConfig := stateConfig{
logger: m.Logger.With("service_id", id.String()),
cache: m.Cache,
health: m.Health,
dataSources: m.DataSources,
source: m.Source,
dnsConfig: m.DNSConfig,
intentionDefaultAllow: m.IntentionDefaultAllow,

View File

@ -2,20 +2,16 @@ package proxycfg
import (
"context"
"path"
"testing"
"time"
"github.com/mitchellh/copystructure"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/rpcclient/health"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil"
@ -31,25 +27,22 @@ func mustCopyProxyConfig(t *testing.T, ns *structs.NodeService) structs.ConnectP
// assertLastReqArgs verifies that each request type had the correct source
// parameters (e.g. Datacenter name) and token.
func assertLastReqArgs(t *testing.T, types *TestCacheTypes, token string, source *structs.QuerySource) {
func assertLastReqArgs(t *testing.T, dataSources *TestDataSources, token string, source *structs.QuerySource) {
t.Helper()
// Roots needs correct DC and token
rootReq := types.roots.lastReq.Load()
require.IsType(t, rootReq, &structs.DCSpecificRequest{})
require.Equal(t, token, rootReq.(*structs.DCSpecificRequest).Token)
require.Equal(t, source.Datacenter, rootReq.(*structs.DCSpecificRequest).Datacenter)
rootReq := dataSources.CARoots.LastReq()
require.Equal(t, token, rootReq.Token)
require.Equal(t, source.Datacenter, rootReq.Datacenter)
// Leaf needs correct DC and token
leafReq := types.leaf.lastReq.Load()
require.IsType(t, leafReq, &cachetype.ConnectCALeafRequest{})
require.Equal(t, token, leafReq.(*cachetype.ConnectCALeafRequest).Token)
require.Equal(t, source.Datacenter, leafReq.(*cachetype.ConnectCALeafRequest).Datacenter)
leafReq := dataSources.LeafCertificate.LastReq()
require.Equal(t, token, leafReq.Token)
require.Equal(t, source.Datacenter, leafReq.Datacenter)
// Intentions needs correct DC and token
intReq := types.intentions.lastReq.Load()
require.IsType(t, intReq, &structs.IntentionQueryRequest{})
require.Equal(t, token, intReq.(*structs.IntentionQueryRequest).Token)
require.Equal(t, source.Datacenter, intReq.(*structs.IntentionQueryRequest).Datacenter)
intReq := dataSources.Intentions.LastReq()
require.Equal(t, token, intReq.Token)
require.Equal(t, source.Datacenter, intReq.Datacenter)
}
func TestManager_BasicLifecycle(t *testing.T) {
@ -125,16 +118,17 @@ func TestManager_BasicLifecycle(t *testing.T) {
},
}
rootsCacheKey := testGenCacheKey(&structs.DCSpecificRequest{
rootsReq := &structs.DCSpecificRequest{
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{Token: "my-token"},
})
leafCacheKey := testGenCacheKey(&cachetype.ConnectCALeafRequest{
}
leafReq := &cachetype.ConnectCALeafRequest{
Datacenter: "dc1",
Token: "my-token",
Service: "web",
})
intentionCacheKey := testGenCacheKey(&structs.IntentionQueryRequest{
}
intentionReq := &structs.IntentionQueryRequest{
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{Token: "my-token"},
Match: &structs.IntentionQueryMatch{
@ -147,16 +141,17 @@ func TestManager_BasicLifecycle(t *testing.T) {
},
},
},
})
meshCacheKey := testGenCacheKey(&structs.ConfigEntryQuery{
}
meshConfigReq := &structs.ConfigEntryQuery{
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{Token: "my-token"},
Kind: structs.MeshConfig,
Name: structs.MeshConfigMesh,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
})
}
dbChainCacheKey := testGenCacheKey(&structs.DiscoveryChainRequest{
dbChainReq := &structs.DiscoveryChainRequest{
Name: "db",
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
@ -166,16 +161,16 @@ func TestManager_BasicLifecycle(t *testing.T) {
OverrideConnectTimeout: 1 * time.Second,
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{Token: "my-token"},
})
}
dbHealthCacheKey := testGenCacheKey(&structs.ServiceSpecificRequest{
dbHealthReq := &structs.ServiceSpecificRequest{
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{Token: "my-token", Filter: ""},
ServiceName: "db",
Connect: true,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
})
db_v1_HealthCacheKey := testGenCacheKey(&structs.ServiceSpecificRequest{
}
db_v1_HealthReq := &structs.ServiceSpecificRequest{
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{Token: "my-token",
Filter: "Service.Meta.version == v1",
@ -183,8 +178,8 @@ func TestManager_BasicLifecycle(t *testing.T) {
ServiceName: "db",
Connect: true,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
})
db_v2_HealthCacheKey := testGenCacheKey(&structs.ServiceSpecificRequest{
}
db_v2_HealthReq := &structs.ServiceSpecificRequest{
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{Token: "my-token",
Filter: "Service.Meta.version == v2",
@ -192,7 +187,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
ServiceName: "db",
Connect: true,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
})
}
db := structs.NewServiceName("db", nil)
dbUID := NewUpstreamIDFromServiceName(db)
@ -201,12 +196,12 @@ func TestManager_BasicLifecycle(t *testing.T) {
tests := []*testcase_BasicLifecycle{
{
name: "simple-default-resolver",
setup: func(t *testing.T, types *TestCacheTypes) {
setup: func(t *testing.T, dataSources *TestDataSources) {
// Note that we deliberately leave the 'geo-cache' prepared query to time out
types.health.Set(dbHealthCacheKey, &structs.IndexedCheckServiceNodes{
dataSources.Health.Set(dbHealthReq, &structs.IndexedCheckServiceNodes{
Nodes: TestUpstreamNodes(t, db.Name),
})
types.compiledChain.Set(dbChainCacheKey, &structs.DiscoveryChainResponse{
dataSources.CompiledDiscoveryChain.Set(dbChainReq, &structs.DiscoveryChainResponse{
Chain: dbDefaultChain(),
})
},
@ -257,15 +252,15 @@ func TestManager_BasicLifecycle(t *testing.T) {
},
{
name: "chain-resolver-with-version-split",
setup: func(t *testing.T, types *TestCacheTypes) {
setup: func(t *testing.T, dataSources *TestDataSources) {
// Note that we deliberately leave the 'geo-cache' prepared query to time out
types.health.Set(db_v1_HealthCacheKey, &structs.IndexedCheckServiceNodes{
dataSources.Health.Set(db_v1_HealthReq, &structs.IndexedCheckServiceNodes{
Nodes: TestUpstreamNodes(t, db.Name),
})
types.health.Set(db_v2_HealthCacheKey, &structs.IndexedCheckServiceNodes{
dataSources.Health.Set(db_v2_HealthReq, &structs.IndexedCheckServiceNodes{
Nodes: TestUpstreamNodesAlternate(t),
})
types.compiledChain.Set(dbChainCacheKey, &structs.DiscoveryChainResponse{
dataSources.CompiledDiscoveryChain.Set(dbChainReq, &structs.DiscoveryChainResponse{
Chain: dbSplitChain(),
})
},
@ -322,15 +317,13 @@ func TestManager_BasicLifecycle(t *testing.T) {
require.NotNil(t, tt.setup)
require.NotNil(t, tt.expectSnap)
// Use a mocked cache to make life simpler
types := NewTestCacheTypes(t)
// Setup initial values
types.roots.Set(rootsCacheKey, roots)
types.leaf.Set(leafCacheKey, leaf)
types.intentions.Set(intentionCacheKey, TestIntentions())
types.configEntry.Set(meshCacheKey, &structs.ConfigEntryResponse{Entry: nil})
tt.setup(t, types)
dataSources := NewTestDataSources()
dataSources.LeafCertificate.Set(leafReq, leaf)
dataSources.CARoots.Set(rootsReq, roots)
dataSources.Intentions.Set(intentionReq, TestIntentions())
dataSources.ConfigEntry.Set(meshConfigReq, &structs.ConfigEntryResponse{Entry: nil})
tt.setup(t, dataSources)
expectSnapCopy, err := copystructure.Copy(tt.expectSnap)
require.NoError(t, err)
@ -338,8 +331,9 @@ func TestManager_BasicLifecycle(t *testing.T) {
webProxyCopy, err := copystructure.Copy(webProxy)
require.NoError(t, err)
testManager_BasicLifecycle(t, types,
rootsCacheKey, leafCacheKey,
testManager_BasicLifecycle(t,
dataSources,
rootsReq, leafReq,
roots,
webProxyCopy.(*structs.NodeService),
expectSnapCopy.(*ConfigSnapshot),
@ -350,30 +344,28 @@ func TestManager_BasicLifecycle(t *testing.T) {
type testcase_BasicLifecycle struct {
name string
setup func(t *testing.T, types *TestCacheTypes)
setup func(t *testing.T, dataSources *TestDataSources)
webProxy *structs.NodeService
expectSnap *ConfigSnapshot
}
func testManager_BasicLifecycle(
t *testing.T,
types *TestCacheTypes,
rootsCacheKey, leafCacheKey string,
dataSources *TestDataSources,
rootsReq *structs.DCSpecificRequest,
leafReq *cachetype.ConnectCALeafRequest,
roots *structs.IndexedCARoots,
webProxy *structs.NodeService,
expectSnap *ConfigSnapshot,
) {
c := TestCacheWithTypes(t, types)
logger := testutil.Logger(t)
source := &structs.QuerySource{Datacenter: "dc1"}
// Create manager
m, err := NewManager(ManagerConfig{
Cache: &CacheWrapper{c},
Health: &HealthWrapper{&health.Client{Cache: c, CacheName: cachetype.HealthServicesName}},
Source: source,
Logger: logger,
Source: source,
Logger: logger,
DataSources: dataSources.ToDataSources(),
})
require.NoError(t, err)
@ -396,7 +388,7 @@ func testManager_BasicLifecycle(
assertWatchChanRecvs(t, wCh, expectSnap)
require.True(t, time.Since(start) >= coalesceTimeout)
assertLastReqArgs(t, types, "my-token", source)
assertLastReqArgs(t, dataSources, "my-token", source)
// Update NodeConfig
webProxy.Port = 7777
@ -420,11 +412,11 @@ func testManager_BasicLifecycle(
// This is actually sort of timing dependent - the cache background fetcher
// will still be fetching with the old token, but we rely on the fact that our
// mock type will have been blocked on those for a while.
assertLastReqArgs(t, types, "other-token", source)
assertLastReqArgs(t, dataSources, "other-token", source)
// Update roots
newRoots, newLeaf := TestCerts(t)
newRoots.Roots = append(newRoots.Roots, roots.Roots...)
types.roots.Set(rootsCacheKey, newRoots)
dataSources.CARoots.Set(rootsReq, newRoots)
// Expect new roots in snapshot
expectSnap.Roots = newRoots
@ -432,7 +424,7 @@ func testManager_BasicLifecycle(
assertWatchChanRecvs(t, wCh2, expectSnap)
// Update leaf
types.leaf.Set(leafCacheKey, newLeaf)
dataSources.LeafCertificate.Set(leafReq, newLeaf)
// Expect new roots in snapshot
expectSnap.ConnectProxy.Leaf = newLeaf
@ -500,7 +492,6 @@ func TestManager_deliverLatest(t *testing.T) {
// None of these need to do anything to test this method just be valid
logger := testutil.Logger(t)
cfg := ManagerConfig{
Cache: &CacheWrapper{cache.New(cache.Options{EntryFetchRate: rate.Inf, EntryFetchMaxBurst: 2})},
Source: &structs.QuerySource{
Node: "node1",
Datacenter: "dc1",
@ -555,21 +546,14 @@ func TestManager_deliverLatest(t *testing.T) {
require.Equal(t, snap2, <-ch5)
}
func testGenCacheKey(req cache.Request) string {
info := req.CacheInfo()
return path.Join(info.Key, info.Datacenter)
}
func TestManager_SyncState_No_Notify(t *testing.T) {
types := NewTestCacheTypes(t)
c := TestCacheWithTypes(t, types)
dataSources := NewTestDataSources()
logger := testutil.Logger(t)
m, err := NewManager(ManagerConfig{
Cache: &CacheWrapper{c},
Health: &HealthWrapper{&health.Client{Cache: c, CacheName: cachetype.HealthServicesName}},
Source: &structs.QuerySource{Datacenter: "dc1"},
Logger: logger,
Source: &structs.QuerySource{Datacenter: "dc1"},
Logger: logger,
DataSources: dataSources.ToDataSources(),
})
require.NoError(t, err)
defer m.Close()

View File

@ -6,7 +6,6 @@ import (
"strings"
"time"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/logging"
)
@ -19,7 +18,7 @@ type handlerMeshGateway struct {
func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, error) {
snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig)
// Watch for root changes
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
err := s.dataSources.CARoots.Notify(ctx, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Source: *s.source,
@ -34,7 +33,7 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
// Eventually we will have to watch connect enabled instances for each service as well as the
// destination services themselves but those notifications will be setup later.
// We cannot setup those watches until we know what the services are.
err = s.cache.Notify(ctx, cachetype.CatalogServiceListName, &structs.DCSpecificRequest{
err = s.dataSources.ServiceList.Notify(ctx, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Source: *s.source,
@ -46,7 +45,7 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
}
// Watch service-resolvers so we can setup service subset clusters
err = s.cache.Notify(ctx, cachetype.ConfigEntryListName, &structs.ConfigEntryQuery{
err = s.dataSources.ConfigEntryList.Notify(ctx, &structs.ConfigEntryQuery{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Kind: structs.ServiceResolver,
@ -85,7 +84,7 @@ func (s *handlerMeshGateway) initializeCrossDCWatches(ctx context.Context) error
// Conveniently we can just use this service meta attribute in one
// place here to set the machinery in motion and leave the conditional
// behavior out of the rest of the package.
err := s.cache.Notify(ctx, cachetype.FederationStateListMeshGatewaysName, &structs.DCSpecificRequest{
err := s.dataSources.FederationStateListMeshGateways.Notify(ctx, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Source: *s.source,
@ -94,7 +93,7 @@ func (s *handlerMeshGateway) initializeCrossDCWatches(ctx context.Context) error
return err
}
err = s.health.Notify(ctx, structs.ServiceSpecificRequest{
err = s.dataSources.Health.Notify(ctx, &structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: structs.ConsulServiceName,
@ -104,7 +103,7 @@ func (s *handlerMeshGateway) initializeCrossDCWatches(ctx context.Context) error
}
}
err := s.cache.Notify(ctx, cachetype.CatalogDatacentersName, &structs.DatacentersRequest{
err := s.dataSources.Datacenters.Notify(ctx, &structs.DatacentersRequest{
QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: 30 * time.Second},
}, datacentersWatchID, s.ch)
if err != nil {
@ -168,7 +167,7 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
if _, ok := snap.MeshGateway.WatchedServices[svc]; !ok {
ctx, cancel := context.WithCancel(ctx)
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{
err := s.dataSources.Health.Notify(ctx, &structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: svc.Name,
@ -220,7 +219,7 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
if _, ok := snap.MeshGateway.WatchedGateways[gk.String()]; !ok {
ctx, cancel := context.WithCancel(ctx)
err := s.cache.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
err := s.dataSources.InternalServiceDump.Notify(ctx, &structs.ServiceDumpRequest{
Datacenter: dc,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceKind: structs.ServiceKindMeshGateway,

View File

@ -11,29 +11,11 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/mitchellh/copystructure"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/logging"
)
// UpdateEvent contains new data for a resource we are subscribed to (e.g. an
// agent cache entry).
type UpdateEvent struct {
CorrelationID string
Result interface{}
Err error
}
type CacheNotifier interface {
Notify(ctx context.Context, t string, r cache.Request,
correlationID string, ch chan<- UpdateEvent) error
}
type Health interface {
Notify(ctx context.Context, req structs.ServiceSpecificRequest, correlationID string, ch chan<- UpdateEvent) error
}
const (
coalesceTimeout = 200 * time.Millisecond
rootsWatchID = "roots"
@ -61,8 +43,7 @@ const (
type stateConfig struct {
logger hclog.Logger
source *structs.QuerySource
cache CacheNotifier
health Health
dataSources DataSources
dnsConfig DNSConfig
serverSNIFn ServerSNIFunc
intentionDefaultAllow bool
@ -458,16 +439,16 @@ func hostnameEndpoints(logger hclog.Logger, localKey GatewayKey, nodes structs.C
}
type gatewayWatchOpts struct {
notifier CacheNotifier
notifyCh chan UpdateEvent
source structs.QuerySource
token string
key GatewayKey
upstreamID UpstreamID
internalServiceDump InternalServiceDump
notifyCh chan UpdateEvent
source structs.QuerySource
token string
key GatewayKey
upstreamID UpstreamID
}
func watchMeshGateway(ctx context.Context, opts gatewayWatchOpts) error {
return opts.notifier.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
return opts.internalServiceDump.Notify(ctx, &structs.ServiceDumpRequest{
Datacenter: opts.key.Datacenter,
QueryOptions: structs.QueryOptions{Token: opts.token},
ServiceKind: structs.ServiceKindMeshGateway,

View File

@ -0,0 +1,6 @@
//go:build !consulent
// +build !consulent
package proxycfg
func recordWatchesEnterprise(*stateConfig, *watchRecorder) {}

View File

@ -11,10 +11,8 @@ import (
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/rpcclient/health"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil"
)
@ -114,100 +112,89 @@ func TestStateChanged(t *testing.T) {
}
}
type testCacheNotifierRequest struct {
cacheType string
request cache.Request
cb func(UpdateEvent)
func recordWatches(sc *stateConfig) *watchRecorder {
wr := newWatchRecorder()
sc.dataSources = DataSources{
CARoots: typedWatchRecorder[*structs.DCSpecificRequest]{wr},
CompiledDiscoveryChain: typedWatchRecorder[*structs.DiscoveryChainRequest]{wr},
ConfigEntry: typedWatchRecorder[*structs.ConfigEntryQuery]{wr},
ConfigEntryList: typedWatchRecorder[*structs.ConfigEntryQuery]{wr},
Datacenters: typedWatchRecorder[*structs.DatacentersRequest]{wr},
FederationStateListMeshGateways: typedWatchRecorder[*structs.DCSpecificRequest]{wr},
GatewayServices: typedWatchRecorder[*structs.ServiceSpecificRequest]{wr},
Health: typedWatchRecorder[*structs.ServiceSpecificRequest]{wr},
HTTPChecks: typedWatchRecorder[*cachetype.ServiceHTTPChecksRequest]{wr},
Intentions: typedWatchRecorder[*structs.IntentionQueryRequest]{wr},
IntentionUpstreams: typedWatchRecorder[*structs.ServiceSpecificRequest]{wr},
InternalServiceDump: typedWatchRecorder[*structs.ServiceDumpRequest]{wr},
LeafCertificate: typedWatchRecorder[*cachetype.ConnectCALeafRequest]{wr},
PreparedQuery: typedWatchRecorder[*structs.PreparedQueryExecuteRequest]{wr},
ResolvedServiceConfig: typedWatchRecorder[*structs.ServiceConfigRequest]{wr},
ServiceList: typedWatchRecorder[*structs.DCSpecificRequest]{wr},
}
recordWatchesEnterprise(sc, wr)
return wr
}
type testCacheNotifier struct {
lock sync.RWMutex
notifiers map[string]testCacheNotifierRequest
}
func newTestCacheNotifier() *testCacheNotifier {
return &testCacheNotifier{
notifiers: make(map[string]testCacheNotifierRequest),
func newWatchRecorder() *watchRecorder {
return &watchRecorder{
watches: make(map[string]any),
}
}
func (cn *testCacheNotifier) Notify(ctx context.Context, t string, r cache.Request, correlationId string, ch chan<- UpdateEvent) error {
cn.lock.Lock()
cn.notifiers[correlationId] = testCacheNotifierRequest{t, r, func(event UpdateEvent) { ch <- event }}
cn.lock.Unlock()
type watchRecorder struct {
mu sync.Mutex
watches map[string]any
}
func (r *watchRecorder) record(correlationID string, req any) {
r.mu.Lock()
r.watches[correlationID] = req
r.mu.Unlock()
}
func (r *watchRecorder) verify(t *testing.T, correlationID string, verifyFn verifyWatchRequest) {
t.Helper()
r.mu.Lock()
req, ok := r.watches[correlationID]
r.mu.Unlock()
require.True(t, ok, "No such watch for Correlation ID: %q", correlationID)
if verifyFn != nil {
verifyFn(t, req)
}
}
type typedWatchRecorder[ReqType any] struct {
recorder *watchRecorder
}
func (r typedWatchRecorder[ReqType]) Notify(_ context.Context, req ReqType, correlationID string, _ chan<- UpdateEvent) error {
r.recorder.record(correlationID, req)
return nil
}
// NotifyCallback satisfies the health.CacheGetter interface.
func (cn *testCacheNotifier) NotifyCallback(ctx context.Context, t string, r cache.Request, correlationId string, cb cache.Callback) error {
cn.lock.Lock()
cn.notifiers[correlationId] = testCacheNotifierRequest{t, r, func(event UpdateEvent) {
cb(ctx, cache.UpdateEvent{
CorrelationID: event.CorrelationID,
Result: event.Result,
Err: event.Err,
})
}}
cn.lock.Unlock()
return nil
}
func (cn *testCacheNotifier) Get(ctx context.Context, t string, r cache.Request) (interface{}, cache.ResultMeta, error) {
panic("Get: not implemented")
}
func (cn *testCacheNotifier) getNotifierRequest(t testing.TB, correlationId string) testCacheNotifierRequest {
cn.lock.RLock()
req, ok := cn.notifiers[correlationId]
cn.lock.RUnlock()
require.True(t, ok, "Correlation ID: %s is missing", correlationId)
return req
}
func (cn *testCacheNotifier) sendNotification(t testing.TB, correlationId string, event UpdateEvent) {
req := cn.getNotifierRequest(t, correlationId)
require.NotNil(t, req.cb)
req.cb(event)
}
func (cn *testCacheNotifier) verifyWatch(t testing.TB, correlationId string) (string, cache.Request) {
// t.Logf("Watches: %+v", cn.notifiers)
req := cn.getNotifierRequest(t, correlationId)
require.NotNil(t, req.cb)
return req.cacheType, req.request
}
type verifyWatchRequest func(t testing.TB, cacheType string, request cache.Request)
func genVerifyDCSpecificWatch(expectedCacheType string, expectedDatacenter string) verifyWatchRequest {
return func(t testing.TB, cacheType string, request cache.Request) {
require.Equal(t, expectedCacheType, cacheType)
type verifyWatchRequest func(t testing.TB, request any)
func genVerifyDCSpecificWatch(expectedDatacenter string) verifyWatchRequest {
return func(t testing.TB, request any) {
reqReal, ok := request.(*structs.DCSpecificRequest)
require.True(t, ok)
require.Equal(t, expectedDatacenter, reqReal.Datacenter)
}
}
func genVerifyRootsWatch(expectedDatacenter string) verifyWatchRequest {
return genVerifyDCSpecificWatch(cachetype.ConnectCARootName, expectedDatacenter)
}
func genVerifyListServicesWatch(expectedDatacenter string) verifyWatchRequest {
return genVerifyDCSpecificWatch(cachetype.CatalogServiceListName, expectedDatacenter)
}
func verifyDatacentersWatch(t testing.TB, cacheType string, request cache.Request) {
require.Equal(t, cachetype.CatalogDatacentersName, cacheType)
func verifyDatacentersWatch(t testing.TB, request any) {
_, ok := request.(*structs.DatacentersRequest)
require.True(t, ok)
}
func genVerifyLeafWatchWithDNSSANs(expectedService string, expectedDatacenter string, expectedDNSSANs []string) verifyWatchRequest {
return func(t testing.TB, cacheType string, request cache.Request) {
require.Equal(t, cachetype.ConnectCALeafName, cacheType)
return func(t testing.TB, request any) {
reqReal, ok := request.(*cachetype.ConnectCALeafRequest)
require.True(t, ok)
require.Equal(t, expectedDatacenter, reqReal.Datacenter)
@ -221,9 +208,7 @@ func genVerifyLeafWatch(expectedService string, expectedDatacenter string) verif
}
func genVerifyResolverWatch(expectedService, expectedDatacenter, expectedKind string) verifyWatchRequest {
return func(t testing.TB, cacheType string, request cache.Request) {
require.Equal(t, cachetype.ConfigEntryName, cacheType)
return func(t testing.TB, request any) {
reqReal, ok := request.(*structs.ConfigEntryQuery)
require.True(t, ok)
require.Equal(t, expectedDatacenter, reqReal.Datacenter)
@ -233,9 +218,7 @@ func genVerifyResolverWatch(expectedService, expectedDatacenter, expectedKind st
}
func genVerifyResolvedConfigWatch(expectedService string, expectedDatacenter string) verifyWatchRequest {
return func(t testing.TB, cacheType string, request cache.Request) {
require.Equal(t, cachetype.ResolvedServiceConfigName, cacheType)
return func(t testing.TB, request any) {
reqReal, ok := request.(*structs.ServiceConfigRequest)
require.True(t, ok)
require.Equal(t, expectedDatacenter, reqReal.Datacenter)
@ -244,9 +227,7 @@ func genVerifyResolvedConfigWatch(expectedService string, expectedDatacenter str
}
func genVerifyIntentionWatch(expectedService string, expectedDatacenter string) verifyWatchRequest {
return func(t testing.TB, cacheType string, request cache.Request) {
require.Equal(t, cachetype.IntentionMatchName, cacheType)
return func(t testing.TB, request any) {
reqReal, ok := request.(*structs.IntentionQueryRequest)
require.True(t, ok)
require.Equal(t, expectedDatacenter, reqReal.Datacenter)
@ -258,21 +239,8 @@ func genVerifyIntentionWatch(expectedService string, expectedDatacenter string)
}
}
func genVerifyIntentionUpstreamsWatch(expectedService string, expectedDatacenter string) verifyWatchRequest {
return func(t testing.TB, cacheType string, request cache.Request) {
require.Equal(t, cachetype.IntentionUpstreamsName, cacheType)
reqReal, ok := request.(*structs.ServiceSpecificRequest)
require.True(t, ok)
require.Equal(t, expectedDatacenter, reqReal.Datacenter)
require.Equal(t, expectedService, reqReal.ServiceName)
}
}
func genVerifyPreparedQueryWatch(expectedName string, expectedDatacenter string) verifyWatchRequest {
return func(t testing.TB, cacheType string, request cache.Request) {
require.Equal(t, cachetype.PreparedQueryName, cacheType)
return func(t testing.TB, request any) {
reqReal, ok := request.(*structs.PreparedQueryExecuteRequest)
require.True(t, ok)
require.Equal(t, expectedDatacenter, reqReal.Datacenter)
@ -282,9 +250,7 @@ func genVerifyPreparedQueryWatch(expectedName string, expectedDatacenter string)
}
func genVerifyDiscoveryChainWatch(expected *structs.DiscoveryChainRequest) verifyWatchRequest {
return func(t testing.TB, cacheType string, request cache.Request) {
require.Equal(t, cachetype.CompiledDiscoveryChainName, cacheType)
return func(t testing.TB, request any) {
reqReal, ok := request.(*structs.DiscoveryChainRequest)
require.True(t, ok)
require.Equal(t, expected, reqReal)
@ -292,9 +258,7 @@ func genVerifyDiscoveryChainWatch(expected *structs.DiscoveryChainRequest) verif
}
func genVerifyMeshConfigWatch(expectedDatacenter string) verifyWatchRequest {
return func(t testing.TB, cacheType string, request cache.Request) {
require.Equal(t, cachetype.ConfigEntryName, cacheType)
return func(t testing.TB, request any) {
reqReal, ok := request.(*structs.ConfigEntryQuery)
require.True(t, ok)
require.Equal(t, expectedDatacenter, reqReal.Datacenter)
@ -304,9 +268,7 @@ func genVerifyMeshConfigWatch(expectedDatacenter string) verifyWatchRequest {
}
func genVerifyGatewayWatch(expectedDatacenter string) verifyWatchRequest {
return func(t testing.TB, cacheType string, request cache.Request) {
require.Equal(t, cachetype.InternalServiceDumpName, cacheType)
return func(t testing.TB, request any) {
reqReal, ok := request.(*structs.ServiceDumpRequest)
require.True(t, ok)
require.Equal(t, expectedDatacenter, reqReal.Datacenter)
@ -316,10 +278,8 @@ func genVerifyGatewayWatch(expectedDatacenter string) verifyWatchRequest {
}
}
func genVerifyServiceSpecificRequest(expectedCacheType, expectedService, expectedFilter, expectedDatacenter string, connect bool) verifyWatchRequest {
return func(t testing.TB, cacheType string, request cache.Request) {
require.Equal(t, expectedCacheType, cacheType)
func genVerifyServiceSpecificRequest(expectedService, expectedFilter, expectedDatacenter string, connect bool) verifyWatchRequest {
return func(t testing.TB, request any) {
reqReal, ok := request.(*structs.ServiceSpecificRequest)
require.True(t, ok)
require.Equal(t, expectedDatacenter, reqReal.Datacenter)
@ -329,18 +289,12 @@ func genVerifyServiceSpecificRequest(expectedCacheType, expectedService, expecte
}
}
func genVerifyServiceWatch(expectedService, expectedFilter, expectedDatacenter string, connect bool) verifyWatchRequest {
return genVerifyServiceSpecificRequest(cachetype.HealthServicesName, expectedService, expectedFilter, expectedDatacenter, connect)
}
func genVerifyGatewayServiceWatch(expectedService, expectedDatacenter string) verifyWatchRequest {
return genVerifyServiceSpecificRequest(cachetype.GatewayServicesName, expectedService, "", expectedDatacenter, false)
return genVerifyServiceSpecificRequest(expectedService, "", expectedDatacenter, false)
}
func genVerifyConfigEntryWatch(expectedKind, expectedName, expectedDatacenter string) verifyWatchRequest {
return func(t testing.TB, cacheType string, request cache.Request) {
require.Equal(t, cachetype.ConfigEntryName, cacheType)
return func(t testing.TB, request any) {
reqReal, ok := request.(*structs.ConfigEntryQuery)
require.True(t, ok)
require.Equal(t, expectedKind, reqReal.Kind)
@ -507,11 +461,8 @@ func TestState_WatchesAndUpdates(t *testing.T) {
stage0 := verificationStage{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
leafWatchID: genVerifyLeafWatch("web", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("web", "dc1"),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
"upstream:" + pqUID.String(): genVerifyPreparedQueryWatch("query", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("web", "dc1"),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
fmt.Sprintf("discovery-chain:%s", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "api",
EvaluateInDatacenter: "dc1",
@ -562,6 +513,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Mode: meshGatewayProxyConfigValue,
},
}),
"upstream:" + pqUID.String(): genVerifyPreparedQueryWatch("query", "dc1"),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
leafWatchID: genVerifyLeafWatch("web", "dc1"),
},
events: []UpdateEvent{
rootWatchEvent(),
@ -660,10 +614,10 @@ func TestState_WatchesAndUpdates(t *testing.T) {
stage1 := verificationStage{
requiredWatches: map[string]verifyWatchRequest{
fmt.Sprintf("upstream-target:api.default.default.dc1:%s", apiUID.String()): genVerifyServiceWatch("api", "", "dc1", true),
fmt.Sprintf("upstream-target:api-failover-remote.default.default.dc2:%s-failover-remote?dc=dc2", apiUID.String()): genVerifyServiceWatch("api-failover-remote", "", "dc2", true),
fmt.Sprintf("upstream-target:api-failover-local.default.default.dc2:%s-failover-local?dc=dc2", apiUID.String()): genVerifyServiceWatch("api-failover-local", "", "dc2", true),
fmt.Sprintf("upstream-target:api-failover-direct.default.default.dc2:%s-failover-direct?dc=dc2", apiUID.String()): genVerifyServiceWatch("api-failover-direct", "", "dc2", true),
fmt.Sprintf("upstream-target:api.default.default.dc1:%s", apiUID.String()): genVerifyServiceSpecificRequest("api", "", "dc1", true),
fmt.Sprintf("upstream-target:api-failover-remote.default.default.dc2:%s-failover-remote?dc=dc2", apiUID.String()): genVerifyServiceSpecificRequest("api-failover-remote", "", "dc2", true),
fmt.Sprintf("upstream-target:api-failover-local.default.default.dc2:%s-failover-local?dc=dc2", apiUID.String()): genVerifyServiceSpecificRequest("api-failover-local", "", "dc2", true),
fmt.Sprintf("upstream-target:api-failover-direct.default.default.dc2:%s-failover-direct?dc=dc2", apiUID.String()): genVerifyServiceSpecificRequest("api-failover-direct", "", "dc2", true),
fmt.Sprintf("mesh-gateway:dc2:%s-failover-remote?dc=dc2", apiUID.String()): genVerifyGatewayWatch("dc2"),
fmt.Sprintf("mesh-gateway:dc1:%s-failover-local?dc=dc2", apiUID.String()): genVerifyGatewayWatch("dc1"),
},
@ -743,9 +697,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
stages: []verificationStage{
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
serviceListWatchID: genVerifyListServicesWatch("dc1"),
datacentersWatchID: verifyDatacentersWatch,
serviceListWatchID: genVerifyDCSpecificWatch("dc1"),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "gateway without root is not valid")
@ -804,9 +758,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
stages: []verificationStage{
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
serviceListWatchID: genVerifyListServicesWatch("dc1"),
datacentersWatchID: verifyDatacentersWatch,
serviceListWatchID: genVerifyDCSpecificWatch("dc1"),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
},
events: []UpdateEvent{
rootWatchEvent(),
@ -948,9 +902,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
stages: []verificationStage{
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
gatewayConfigWatchID: genVerifyConfigEntryWatch(structs.IngressGateway, "ingress-gateway", "dc1"),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
gatewayServicesWatchID: genVerifyGatewayServiceWatch("ingress-gateway", "dc1"),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
@ -1061,7 +1015,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
{
requiredWatches: map[string]verifyWatchRequest{
"upstream-target:api.default.default.dc1:" + apiUID.String(): genVerifyServiceWatch("api", "", "dc1", true),
"upstream-target:api.default.default.dc1:" + apiUID.String(): genVerifyServiceSpecificRequest("api", "", "dc1", true),
},
events: []UpdateEvent{
{
@ -1117,9 +1071,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
stages: []verificationStage{
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
gatewayConfigWatchID: genVerifyConfigEntryWatch(structs.IngressGateway, "ingress-gateway", "dc1"),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
gatewayServicesWatchID: genVerifyGatewayServiceWatch("ingress-gateway", "dc1"),
},
events: []UpdateEvent{
@ -1197,9 +1151,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
stages: []verificationStage{
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
gatewayConfigWatchID: genVerifyConfigEntryWatch(structs.IngressGateway, "ingress-gateway", "dc1"),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
gatewayServicesWatchID: genVerifyGatewayServiceWatch("ingress-gateway", "dc1"),
},
events: []UpdateEvent{
@ -1290,10 +1244,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
stages: []verificationStage{
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
gatewayServicesWatchID: genVerifyServiceSpecificRequest(gatewayServicesWatchID,
"terminating-gateway", "", "dc1", false),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
gatewayServicesWatchID: genVerifyGatewayServiceWatch("terminating-gateway", "dc1"),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "gateway without root is not valid")
@ -1333,10 +1286,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
stages: []verificationStage{
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
gatewayServicesWatchID: genVerifyServiceSpecificRequest(gatewayServicesWatchID,
"terminating-gateway", "", "dc1", false),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
gatewayServicesWatchID: genVerifyGatewayServiceWatch("terminating-gateway", "dc1"),
},
events: []UpdateEvent{
rootWatchEvent(),
@ -1425,7 +1377,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
{
requiredWatches: map[string]verifyWatchRequest{
"external-service:" + db.String(): genVerifyServiceWatch("db", "", "dc1", false),
"external-service:" + db.String(): genVerifyServiceSpecificRequest("db", "", "dc1", false),
},
events: []UpdateEvent{
{
@ -1470,7 +1422,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
{
requiredWatches: map[string]verifyWatchRequest{
"external-service:" + api.String(): genVerifyServiceWatch("api", "", "dc1", false),
"external-service:" + api.String(): genVerifyServiceSpecificRequest("api", "", "dc1", false),
},
events: []UpdateEvent{
{
@ -1706,12 +1658,11 @@ func TestState_WatchesAndUpdates(t *testing.T) {
stages: []verificationStage{
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID,
"api", "", "dc1", false),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest("api", "", "dc1", false),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid")
@ -1792,12 +1743,11 @@ func TestState_WatchesAndUpdates(t *testing.T) {
// Empty on initialization
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID,
"api", "", "dc1", false),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest("api", "", "dc1", false),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid")
@ -1852,11 +1802,10 @@ func TestState_WatchesAndUpdates(t *testing.T) {
// Receiving an intention should lead to spinning up a discovery chain watch
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID,
"api", "", "dc1", false),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest("api", "", "dc1", false),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
},
events: []UpdateEvent{
{
@ -1917,7 +1866,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
{
requiredWatches: map[string]verifyWatchRequest{
"upstream-target:db.default.default.dc1:" + dbUID.String(): genVerifyServiceWatch("db", "", "dc1", true),
"upstream-target:db.default.default.dc1:" + dbUID.String(): genVerifyServiceSpecificRequest("db", "", "dc1", true),
},
events: []UpdateEvent{
{
@ -2284,11 +2233,10 @@ func TestState_WatchesAndUpdates(t *testing.T) {
{
// Empty list of upstreams should clean up map keys
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID,
"api", "", "dc1", false),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest("api", "", "dc1", false),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
},
events: []UpdateEvent{
{
@ -2351,12 +2299,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
// Empty on initialization
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID,
"api", "", "dc1", false),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest("api", "", "dc1", false),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
"discovery-chain:" + upstreamIDForDC2(dbUID).String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "db",
EvaluateInDatacenter: "dc2",
@ -2365,6 +2310,8 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Datacenter: "dc1",
OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal},
}),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid")
@ -2452,11 +2399,8 @@ func TestState_WatchesAndUpdates(t *testing.T) {
// be deleted from the snapshot.
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID,
"api", "", "dc1", false),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest("api", "", "dc1", false),
"discovery-chain:" + upstreamIDForDC2(dbUID).String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "db",
EvaluateInDatacenter: "dc2",
@ -2465,6 +2409,8 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Datacenter: "dc1",
OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal},
}),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
},
events: []UpdateEvent{
{
@ -2526,8 +2472,6 @@ func TestState_WatchesAndUpdates(t *testing.T) {
// First evaluate peered upstream
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
leafWatchID: genVerifyLeafWatch("web", "dc1"),
fmt.Sprintf("discovery-chain:%s", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "api",
EvaluateInDatacenter: "dc1",
@ -2535,6 +2479,8 @@ func TestState_WatchesAndUpdates(t *testing.T) {
EvaluateInPartition: "default",
Datacenter: "dc1",
}),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
leafWatchID: genVerifyLeafWatch("web", "dc1"),
// No Peering watch
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
@ -2604,12 +2550,10 @@ func TestState_WatchesAndUpdates(t *testing.T) {
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
cn := newTestCacheNotifier()
proxyID := ProxyID{ServiceID: tc.ns.CompoundServiceID()}
state, err := newState(proxyID, &tc.ns, testSource, "", stateConfig{
sc := stateConfig{
logger: testutil.Logger(t),
cache: cn,
health: &HealthWrapper{&health.Client{Cache: cn, CacheName: cachetype.HealthServicesName}},
source: &structs.QuerySource{
Datacenter: tc.sourceDC,
},
@ -2617,7 +2561,10 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Domain: "consul.",
AltDomain: "alt.consul.",
},
})
}
wr := recordWatches(&sc)
state, err := newState(proxyID, &tc.ns, testSource, "", sc)
// verify building the initial state worked
require.NoError(t, err)
@ -2644,13 +2591,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.True(t, t.Run(fmt.Sprintf("stage-%d", idx), func(t *testing.T) {
for correlationId, verifier := range stage.requiredWatches {
require.True(t, t.Run(correlationId, func(t *testing.T) {
// verify that the watch was initiated
cacheType, request := cn.verifyWatch(t, correlationId)
// run the verifier if any
if verifier != nil {
verifier(t, cacheType, request)
}
wr.verify(t, correlationId, verifier)
}))
}

View File

@ -17,7 +17,7 @@ type handlerTerminatingGateway struct {
func (s *handlerTerminatingGateway) initialize(ctx context.Context) (ConfigSnapshot, error) {
snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig)
// Watch for root changes
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
err := s.dataSources.CARoots.Notify(ctx, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Source: *s.source,
@ -28,7 +28,7 @@ func (s *handlerTerminatingGateway) initialize(ctx context.Context) (ConfigSnaps
}
// Get information about the entire service mesh.
err = s.cache.Notify(ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{
err = s.dataSources.ConfigEntry.Notify(ctx, &structs.ConfigEntryQuery{
Kind: structs.MeshConfig,
Name: structs.MeshConfigMesh,
Datacenter: s.source.Datacenter,
@ -40,7 +40,7 @@ func (s *handlerTerminatingGateway) initialize(ctx context.Context) (ConfigSnaps
}
// Watch for the terminating-gateway's linked services
err = s.cache.Notify(ctx, cachetype.GatewayServicesName, &structs.ServiceSpecificRequest{
err = s.dataSources.GatewayServices.Notify(ctx, &structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: s.service,
@ -116,7 +116,7 @@ func (s *handlerTerminatingGateway) handleUpdate(ctx context.Context, u UpdateEv
// Watch the health endpoint to discover endpoints for the service
if _, ok := snap.TerminatingGateway.WatchedServices[svc.Service]; !ok {
ctx, cancel := context.WithCancel(ctx)
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{
err := s.dataSources.Health.Notify(ctx, &structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: svc.Service.Name,
@ -141,7 +141,7 @@ func (s *handlerTerminatingGateway) handleUpdate(ctx context.Context, u UpdateEv
// The gateway will enforce intentions for connections to the service
if _, ok := snap.TerminatingGateway.WatchedIntentions[svc.Service]; !ok {
ctx, cancel := context.WithCancel(ctx)
err := s.cache.Notify(ctx, cachetype.IntentionMatchName, &structs.IntentionQueryRequest{
err := s.dataSources.Intentions.Notify(ctx, &structs.IntentionQueryRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Match: &structs.IntentionQueryMatch{
@ -171,7 +171,7 @@ func (s *handlerTerminatingGateway) handleUpdate(ctx context.Context, u UpdateEv
// This cert is used to terminate mTLS connections on the service's behalf
if _, ok := snap.TerminatingGateway.WatchedLeaves[svc.Service]; !ok {
ctx, cancel := context.WithCancel(ctx)
err := s.cache.Notify(ctx, cachetype.ConnectCALeafName, &cachetype.ConnectCALeafRequest{
err := s.dataSources.LeafCertificate.Notify(ctx, &cachetype.ConnectCALeafRequest{
Datacenter: s.source.Datacenter,
Token: s.token,
Service: svc.Service.Name,
@ -193,7 +193,7 @@ func (s *handlerTerminatingGateway) handleUpdate(ctx context.Context, u UpdateEv
// These are used to determine the protocol for the target service.
if _, ok := snap.TerminatingGateway.WatchedConfigs[svc.Service]; !ok {
ctx, cancel := context.WithCancel(ctx)
err := s.cache.Notify(ctx, cachetype.ResolvedServiceConfigName, &structs.ServiceConfigRequest{
err := s.dataSources.ResolvedServiceConfig.Notify(ctx, &structs.ServiceConfigRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Name: svc.Service.Name,
@ -215,7 +215,7 @@ func (s *handlerTerminatingGateway) handleUpdate(ctx context.Context, u UpdateEv
// These are used to create clusters and endpoints for the service subsets
if _, ok := snap.TerminatingGateway.WatchedResolvers[svc.Service]; !ok {
ctx, cancel := context.WithCancel(ctx)
err := s.cache.Notify(ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{
err := s.dataSources.ConfigEntry.Notify(ctx, &structs.ConfigEntryQuery{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Kind: structs.ServiceResolver,

View File

@ -22,53 +22,6 @@ import (
"github.com/hashicorp/consul/api"
)
// TestCacheTypes encapsulates all the different cache types proxycfg.State will
// watch/request for controlling one during testing.
type TestCacheTypes struct {
roots *ControllableCacheType
leaf *ControllableCacheType
intentions *ControllableCacheType
health *ControllableCacheType
query *ControllableCacheType
compiledChain *ControllableCacheType
serviceHTTPChecks *ControllableCacheType
configEntry *ControllableCacheType
}
// NewTestCacheTypes creates a set of ControllableCacheTypes for all types that
// proxycfg will watch suitable for testing a proxycfg.State or Manager.
func NewTestCacheTypes(t testing.T) *TestCacheTypes {
t.Helper()
ct := &TestCacheTypes{
roots: NewControllableCacheType(t),
leaf: NewControllableCacheType(t),
intentions: NewControllableCacheType(t),
health: NewControllableCacheType(t),
query: NewControllableCacheType(t),
compiledChain: NewControllableCacheType(t),
serviceHTTPChecks: NewControllableCacheType(t),
configEntry: NewControllableCacheType(t),
}
ct.query.blocking = false
return ct
}
// TestCacheWithTypes registers ControllableCacheTypes for all types that
// proxycfg will watch suitable for testing a proxycfg.State or Manager.
func TestCacheWithTypes(t testing.T, types *TestCacheTypes) *cache.Cache {
c := cache.New(cache.Options{})
c.RegisterType(cachetype.ConnectCARootName, types.roots)
c.RegisterType(cachetype.ConnectCALeafName, types.leaf)
c.RegisterType(cachetype.IntentionMatchName, types.intentions)
c.RegisterType(cachetype.HealthServicesName, types.health)
c.RegisterType(cachetype.PreparedQueryName, types.query)
c.RegisterType(cachetype.CompiledDiscoveryChainName, types.compiledChain)
c.RegisterType(cachetype.ServiceHTTPChecksName, types.serviceHTTPChecks)
c.RegisterType(cachetype.ConfigEntryName, types.configEntry)
return c
}
// TestCerts generates a CA and Leaf suitable for returning as mock CA
// root/leaf cache requests.
func TestCerts(t testing.T) (*structs.IndexedCARoots, *structs.IssuedCert) {
@ -668,19 +621,9 @@ func TestGatewayServiceGroupFooDC1(t testing.T) structs.CheckServiceNodes {
}
}
type noopCacheNotifier struct{}
type noopDataSource[ReqType any] struct{}
var _ CacheNotifier = (*noopCacheNotifier)(nil)
func (*noopCacheNotifier) Notify(_ context.Context, _ string, _ cache.Request, _ string, _ chan<- UpdateEvent) error {
return nil
}
type noopHealth struct{}
var _ Health = (*noopHealth)(nil)
func (*noopHealth) Notify(_ context.Context, _ structs.ServiceSpecificRequest, _ string, _ chan<- UpdateEvent) error {
func (*noopDataSource[ReqType]) Notify(context.Context, ReqType, string, chan<- UpdateEvent) error {
return nil
}
@ -711,8 +654,24 @@ func testConfigSnapshotFixture(
source: &structs.QuerySource{
Datacenter: "dc1",
},
cache: &noopCacheNotifier{},
health: &noopHealth{},
dataSources: DataSources{
CARoots: &noopDataSource[*structs.DCSpecificRequest]{},
CompiledDiscoveryChain: &noopDataSource[*structs.DiscoveryChainRequest]{},
ConfigEntry: &noopDataSource[*structs.ConfigEntryQuery]{},
ConfigEntryList: &noopDataSource[*structs.ConfigEntryQuery]{},
Datacenters: &noopDataSource[*structs.DatacentersRequest]{},
FederationStateListMeshGateways: &noopDataSource[*structs.DCSpecificRequest]{},
GatewayServices: &noopDataSource[*structs.ServiceSpecificRequest]{},
Health: &noopDataSource[*structs.ServiceSpecificRequest]{},
HTTPChecks: &noopDataSource[*cachetype.ServiceHTTPChecksRequest]{},
Intentions: &noopDataSource[*structs.IntentionQueryRequest]{},
IntentionUpstreams: &noopDataSource[*structs.ServiceSpecificRequest]{},
InternalServiceDump: &noopDataSource[*structs.ServiceDumpRequest]{},
LeafCertificate: &noopDataSource[*cachetype.ConnectCALeafRequest]{},
PreparedQuery: &noopDataSource[*structs.PreparedQueryExecuteRequest]{},
ResolvedServiceConfig: &noopDataSource[*structs.ServiceConfigRequest]{},
ServiceList: &noopDataSource[*structs.DCSpecificRequest]{},
},
dnsConfig: DNSConfig{ // TODO: make configurable
Domain: "consul",
AltDomain: "",
@ -720,6 +679,7 @@ func testConfigSnapshotFixture(
serverSNIFn: serverSNIFn,
intentionDefaultAllow: false, // TODO: make configurable
}
testConfigSnapshotFixtureEnterprise(&config)
s, err := newServiceInstanceFromNodeService(ProxyID{ServiceID: ns.CompoundServiceID()}, ns, token)
if err != nil {
t.Fatalf("err: %v", err)
@ -889,3 +849,159 @@ func projectRoot() string {
_, base, _, _ := runtime.Caller(0)
return filepath.Dir(base)
}
// NewTestDataSources creates a set of data sources that can be used to provide
// the Manager with data in tests.
func NewTestDataSources() *TestDataSources {
srcs := &TestDataSources{
CARoots: NewTestDataSource[*structs.DCSpecificRequest, *structs.IndexedCARoots](),
CompiledDiscoveryChain: NewTestDataSource[*structs.DiscoveryChainRequest, *structs.DiscoveryChainResponse](),
ConfigEntry: NewTestDataSource[*structs.ConfigEntryQuery, *structs.ConfigEntryResponse](),
ConfigEntryList: NewTestDataSource[*structs.ConfigEntryQuery, *structs.IndexedConfigEntries](),
Datacenters: NewTestDataSource[*structs.DatacentersRequest, *[]string](),
FederationStateListMeshGateways: NewTestDataSource[*structs.DCSpecificRequest, *structs.DatacenterIndexedCheckServiceNodes](),
GatewayServices: NewTestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedGatewayServices](),
Health: NewTestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedCheckServiceNodes](),
HTTPChecks: NewTestDataSource[*cachetype.ServiceHTTPChecksRequest, []structs.CheckType](),
Intentions: NewTestDataSource[*structs.IntentionQueryRequest, *structs.IndexedIntentionMatches](),
IntentionUpstreams: NewTestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedServiceList](),
InternalServiceDump: NewTestDataSource[*structs.ServiceDumpRequest, *structs.IndexedNodesWithGateways](),
LeafCertificate: NewTestDataSource[*cachetype.ConnectCALeafRequest, *structs.IssuedCert](),
PreparedQuery: NewTestDataSource[*structs.PreparedQueryExecuteRequest, *structs.PreparedQueryExecuteResponse](),
ResolvedServiceConfig: NewTestDataSource[*structs.ServiceConfigRequest, *structs.ServiceConfigResponse](),
ServiceList: NewTestDataSource[*structs.DCSpecificRequest, *structs.IndexedServiceList](),
}
srcs.buildEnterpriseSources()
return srcs
}
type TestDataSources struct {
CARoots *TestDataSource[*structs.DCSpecificRequest, *structs.IndexedCARoots]
CompiledDiscoveryChain *TestDataSource[*structs.DiscoveryChainRequest, *structs.DiscoveryChainResponse]
ConfigEntry *TestDataSource[*structs.ConfigEntryQuery, *structs.ConfigEntryResponse]
ConfigEntryList *TestDataSource[*structs.ConfigEntryQuery, *structs.IndexedConfigEntries]
FederationStateListMeshGateways *TestDataSource[*structs.DCSpecificRequest, *structs.DatacenterIndexedCheckServiceNodes]
Datacenters *TestDataSource[*structs.DatacentersRequest, *[]string]
GatewayServices *TestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedGatewayServices]
Health *TestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedCheckServiceNodes]
HTTPChecks *TestDataSource[*cachetype.ServiceHTTPChecksRequest, []structs.CheckType]
Intentions *TestDataSource[*structs.IntentionQueryRequest, *structs.IndexedIntentionMatches]
IntentionUpstreams *TestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedServiceList]
InternalServiceDump *TestDataSource[*structs.ServiceDumpRequest, *structs.IndexedNodesWithGateways]
LeafCertificate *TestDataSource[*cachetype.ConnectCALeafRequest, *structs.IssuedCert]
PreparedQuery *TestDataSource[*structs.PreparedQueryExecuteRequest, *structs.PreparedQueryExecuteResponse]
ResolvedServiceConfig *TestDataSource[*structs.ServiceConfigRequest, *structs.ServiceConfigResponse]
ServiceList *TestDataSource[*structs.DCSpecificRequest, *structs.IndexedServiceList]
TestDataSourcesEnterprise
}
func (t *TestDataSources) ToDataSources() DataSources {
ds := DataSources{
CARoots: t.CARoots,
CompiledDiscoveryChain: t.CompiledDiscoveryChain,
ConfigEntry: t.ConfigEntry,
ConfigEntryList: t.ConfigEntryList,
Datacenters: t.Datacenters,
GatewayServices: t.GatewayServices,
Health: t.Health,
HTTPChecks: t.HTTPChecks,
Intentions: t.Intentions,
IntentionUpstreams: t.IntentionUpstreams,
InternalServiceDump: t.InternalServiceDump,
LeafCertificate: t.LeafCertificate,
PreparedQuery: t.PreparedQuery,
ResolvedServiceConfig: t.ResolvedServiceConfig,
ServiceList: t.ServiceList,
}
t.fillEnterpriseDataSources(&ds)
return ds
}
// NewTestDataSource creates a test data source that accepts requests to Notify
// of type RequestType and dispatches UpdateEvents with a result of type ValType.
//
// TODO(agentless): we still depend on cache.Request here because it provides the
// CacheInfo method used for hashing the request - this won't work when we extract
// this package into a shared library.
func NewTestDataSource[ReqType cache.Request, ValType any]() *TestDataSource[ReqType, ValType] {
return &TestDataSource[ReqType, ValType]{
data: make(map[string]ValType),
trigger: make(chan struct{}),
}
}
type TestDataSource[ReqType cache.Request, ValType any] struct {
mu sync.Mutex
data map[string]ValType
lastReq ReqType
// Note: trigger is currently global for all requests of the given type, so
// Manager may receive duplicate events - as the dispatch goroutine will be
// woken up whenever *any* requested data changes.
trigger chan struct{}
}
// Notify satisfies the interfaces used by Manager to subscribe to data.
func (t *TestDataSource[ReqType, ValType]) Notify(ctx context.Context, req ReqType, correlationID string, ch chan<- UpdateEvent) error {
t.mu.Lock()
t.lastReq = req
t.mu.Unlock()
go t.dispatch(ctx, correlationID, t.reqKey(req), ch)
return nil
}
func (t *TestDataSource[ReqType, ValType]) dispatch(ctx context.Context, correlationID, key string, ch chan<- UpdateEvent) {
for {
t.mu.Lock()
val, ok := t.data[key]
trigger := t.trigger
t.mu.Unlock()
if ok {
event := UpdateEvent{
CorrelationID: correlationID,
Result: val,
}
select {
case ch <- event:
case <-ctx.Done():
}
}
select {
case <-trigger:
case <-ctx.Done():
return
}
}
}
func (t *TestDataSource[ReqType, ValType]) reqKey(req ReqType) string {
return req.CacheInfo().Key
}
// Set broadcasts the given value to consumers that subscribed with the given
// request.
func (t *TestDataSource[ReqType, ValType]) Set(req ReqType, val ValType) error {
t.mu.Lock()
t.data[t.reqKey(req)] = val
oldTrigger := t.trigger
t.trigger = make(chan struct{})
t.mu.Unlock()
close(oldTrigger)
return nil
}
// LastReq returns the request from the last call to Notify that was received.
func (t *TestDataSource[ReqType, ValType]) LastReq() ReqType {
t.mu.Lock()
defer t.mu.Unlock()
return t.lastReq
}

View File

@ -0,0 +1,12 @@
//go:build !consulent
// +build !consulent
package proxycfg
type TestDataSourcesEnterprise struct{}
func (*TestDataSources) buildEnterpriseSources() {}
func (*TestDataSources) fillEnterpriseDataSources(*DataSources) {}
func testConfigSnapshotFixtureEnterprise(*stateConfig) {}

View File

@ -9,7 +9,6 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/hashicorp/consul/acl"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs"
)
@ -314,12 +313,12 @@ func (s *handlerUpstreams) resetWatchesFromChain(
ctx, cancel := context.WithCancel(ctx)
opts := gatewayWatchOpts{
notifier: s.cache,
notifyCh: s.ch,
source: *s.source,
token: s.token,
key: gwKey,
upstreamID: uid,
internalServiceDump: s.dataSources.InternalServiceDump,
notifyCh: s.ch,
source: *s.source,
token: s.token,
key: gwKey,
upstreamID: uid,
}
err := watchMeshGateway(ctx, opts)
if err != nil {
@ -372,7 +371,7 @@ func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *Config
correlationID := "upstream-target:" + opts.chainID + ":" + opts.upstreamID.String()
ctx, cancel := context.WithCancel(ctx)
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{
err := s.dataSources.Health.Notify(ctx, &structs.ServiceSpecificRequest{
PeerName: opts.upstreamID.Peer,
Datacenter: opts.datacenter,
QueryOptions: structs.QueryOptions{
@ -413,7 +412,7 @@ func (s *handlerUpstreams) watchDiscoveryChain(ctx context.Context, snap *Config
}
ctx, cancel := context.WithCancel(ctx)
err := s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
err := s.dataSources.CompiledDiscoveryChain.Notify(ctx, &structs.DiscoveryChainRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Name: opts.name,