From 5fe7043439b5bd4f3394094ecab2cd6ab9a427e0 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 14 Apr 2020 18:29:30 -0400 Subject: [PATCH] agent/cache: Make all cache options RegisterOptions Previously the SupportsBlocking option was specified by a method on the type, and all the other options were specified from RegisterOptions. This change moves RegisterOptions to a method on the type, and moves SupportsBlocking into the options struct. Currently there are only 2 cache-types. So all cache-types can implement this method by embedding a struct with those predefined values. In the future if a cache type needs to be registered more than once with different options it can remove the embedded type and implement the method in a way that allows for paramaterization. --- agent/agent.go | 140 +++--------------- agent/cache-types/catalog_datacenters.go | 5 +- agent/cache-types/catalog_list_services.go | 5 +- agent/cache-types/catalog_service_list.go | 5 +- agent/cache-types/catalog_services.go | 5 +- agent/cache-types/config_entry.go | 5 +- agent/cache-types/connect_ca_leaf.go | 5 +- agent/cache-types/connect_ca_leaf_test.go | 24 ++- agent/cache-types/connect_ca_root.go | 5 +- agent/cache-types/discovery_chain.go | 5 +- .../federation_state_list_gateways.go | 5 +- agent/cache-types/gateway_services.go | 5 +- agent/cache-types/health_services.go | 5 +- agent/cache-types/intention_match.go | 5 +- agent/cache-types/node_services.go | 5 +- agent/cache-types/options.go | 36 +++++ agent/cache-types/prepared_query.go | 8 +- agent/cache-types/resolved_service_config.go | 5 +- agent/cache-types/service_checks.go | 5 +- agent/cache-types/service_dump.go | 5 +- agent/cache/cache.go | 18 ++- agent/cache/cache_test.go | 134 +++++++++-------- agent/cache/mock_Request.go | 3 +- agent/cache/mock_Type.go | 11 +- agent/cache/testing.go | 17 ++- agent/cache/type.go | 8 +- agent/cache/watch.go | 2 +- agent/cache/watch_test.go | 18 +-- agent/proxycfg/testing.go | 45 ++---- 29 files changed, 213 insertions(+), 331 deletions(-) create mode 100644 agent/cache-types/options.go diff --git a/agent/agent.go b/agent/agent.go index 27a5b2a402..b0287ea952 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -4182,151 +4182,45 @@ func (a *Agent) registerCache() { // the a.delegate directly, otherwise tests that rely on overriding RPC // routing via a.registerEndpoint will not work. - a.cache.RegisterType(cachetype.ConnectCARootName, &cachetype.ConnectCARoot{ - RPC: a, - }, &cache.RegisterOptions{ - // Maintain a blocking query, retry dropped connections quickly - Refresh: true, - RefreshTimer: 0 * time.Second, - RefreshTimeout: 10 * time.Minute, - }) + a.cache.RegisterType(cachetype.ConnectCARootName, &cachetype.ConnectCARoot{RPC: a}) a.cache.RegisterType(cachetype.ConnectCALeafName, &cachetype.ConnectCALeaf{ RPC: a, Cache: a.cache, Datacenter: a.config.Datacenter, TestOverrideCAChangeInitialDelay: a.config.ConnectTestCALeafRootChangeSpread, - }, &cache.RegisterOptions{ - // Maintain a blocking query, retry dropped connections quickly - Refresh: true, - RefreshTimer: 0 * time.Second, - RefreshTimeout: 10 * time.Minute, }) - a.cache.RegisterType(cachetype.IntentionMatchName, &cachetype.IntentionMatch{ - RPC: a, - }, &cache.RegisterOptions{ - // Maintain a blocking query, retry dropped connections quickly - Refresh: true, - RefreshTimer: 0 * time.Second, - RefreshTimeout: 10 * time.Minute, - }) + a.cache.RegisterType(cachetype.IntentionMatchName, &cachetype.IntentionMatch{RPC: a}) - a.cache.RegisterType(cachetype.CatalogServicesName, &cachetype.CatalogServices{ - RPC: a, - }, &cache.RegisterOptions{ - // Maintain a blocking query, retry dropped connections quickly - Refresh: true, - RefreshTimer: 0 * time.Second, - RefreshTimeout: 10 * time.Minute, - }) + a.cache.RegisterType(cachetype.CatalogServicesName, &cachetype.CatalogServices{RPC: a}) - a.cache.RegisterType(cachetype.HealthServicesName, &cachetype.HealthServices{ - RPC: a, - }, &cache.RegisterOptions{ - // Maintain a blocking query, retry dropped connections quickly - Refresh: true, - RefreshTimer: 0 * time.Second, - RefreshTimeout: 10 * time.Minute, - }) + a.cache.RegisterType(cachetype.HealthServicesName, &cachetype.HealthServices{RPC: a}) - a.cache.RegisterType(cachetype.PreparedQueryName, &cachetype.PreparedQuery{ - RPC: a, - }, &cache.RegisterOptions{ - // Prepared queries don't support blocking - Refresh: false, - }) + a.cache.RegisterType(cachetype.PreparedQueryName, &cachetype.PreparedQuery{RPC: a}) - a.cache.RegisterType(cachetype.NodeServicesName, &cachetype.NodeServices{ - RPC: a, - }, &cache.RegisterOptions{ - // Maintain a blocking query, retry dropped connections quickly - Refresh: true, - RefreshTimer: 0 * time.Second, - RefreshTimeout: 10 * time.Minute, - }) + a.cache.RegisterType(cachetype.NodeServicesName, &cachetype.NodeServices{RPC: a}) - a.cache.RegisterType(cachetype.ResolvedServiceConfigName, &cachetype.ResolvedServiceConfig{ - RPC: a, - }, &cache.RegisterOptions{ - // Maintain a blocking query, retry dropped connections quickly - Refresh: true, - RefreshTimer: 0 * time.Second, - RefreshTimeout: 10 * time.Minute, - }) + a.cache.RegisterType(cachetype.ResolvedServiceConfigName, &cachetype.ResolvedServiceConfig{RPC: a}) - a.cache.RegisterType(cachetype.CatalogListServicesName, &cachetype.CatalogListServices{ - RPC: a, - }, &cache.RegisterOptions{ - Refresh: true, - RefreshTimer: 0 * time.Second, - RefreshTimeout: 10 * time.Minute, - }) + a.cache.RegisterType(cachetype.CatalogListServicesName, &cachetype.CatalogListServices{RPC: a}) - a.cache.RegisterType(cachetype.CatalogServiceListName, &cachetype.CatalogServiceList{ - RPC: a, - }, &cache.RegisterOptions{ - Refresh: true, - RefreshTimer: 0 * time.Second, - RefreshTimeout: 10 * time.Minute, - }) + a.cache.RegisterType(cachetype.CatalogServiceListName, &cachetype.CatalogServiceList{RPC: a}) - a.cache.RegisterType(cachetype.CatalogDatacentersName, &cachetype.CatalogDatacenters{ - RPC: a, - }, &cache.RegisterOptions{ - Refresh: false, - }) + a.cache.RegisterType(cachetype.CatalogDatacentersName, &cachetype.CatalogDatacenters{RPC: a}) - a.cache.RegisterType(cachetype.InternalServiceDumpName, &cachetype.InternalServiceDump{ - RPC: a, - }, &cache.RegisterOptions{ - Refresh: true, - RefreshTimer: 0 * time.Second, - RefreshTimeout: 10 * time.Minute, - }) + a.cache.RegisterType(cachetype.InternalServiceDumpName, &cachetype.InternalServiceDump{RPC: a}) - a.cache.RegisterType(cachetype.CompiledDiscoveryChainName, &cachetype.CompiledDiscoveryChain{ - RPC: a, - }, &cache.RegisterOptions{ - // Maintain a blocking query, retry dropped connections quickly - Refresh: true, - RefreshTimer: 0 * time.Second, - RefreshTimeout: 10 * time.Minute, - }) + a.cache.RegisterType(cachetype.CompiledDiscoveryChainName, &cachetype.CompiledDiscoveryChain{RPC: a}) - a.cache.RegisterType(cachetype.GatewayServicesName, &cachetype.GatewayServices{ - RPC: a, - }, &cache.RegisterOptions{ - // Maintain a blocking query, retry dropped connections quickly - Refresh: true, - RefreshTimer: 0 * time.Second, - RefreshTimeout: 10 * time.Minute, - }) + a.cache.RegisterType(cachetype.GatewayServicesName, &cachetype.GatewayServices{RPC: a}) - a.cache.RegisterType(cachetype.ConfigEntriesName, &cachetype.ConfigEntries{ - RPC: a, - }, &cache.RegisterOptions{ - // Maintain a blocking query, retry dropped connections quickly - Refresh: true, - RefreshTimer: 0 * time.Second, - RefreshTimeout: 10 * time.Minute, - }) + a.cache.RegisterType(cachetype.ConfigEntriesName, &cachetype.ConfigEntries{RPC: a}) - a.cache.RegisterType(cachetype.ServiceHTTPChecksName, &cachetype.ServiceHTTPChecks{ - Agent: a, - }, &cache.RegisterOptions{ - Refresh: true, - RefreshTimer: 0 * time.Second, - RefreshTimeout: 10 * time.Minute, - }) + a.cache.RegisterType(cachetype.ServiceHTTPChecksName, &cachetype.ServiceHTTPChecks{Agent: a}) - a.cache.RegisterType(cachetype.FederationStateListMeshGatewaysName, &cachetype.FederationStateListMeshGateways{ - RPC: a, - }, &cache.RegisterOptions{ - Refresh: true, - RefreshTimer: 0 * time.Second, - RefreshTimeout: 10 * time.Minute, - }) + a.cache.RegisterType(cachetype.FederationStateListMeshGatewaysName, + &cachetype.FederationStateListMeshGateways{RPC: a}) } // LocalState returns the agent's local state diff --git a/agent/cache-types/catalog_datacenters.go b/agent/cache-types/catalog_datacenters.go index 2c50b19b7f..46c7e97e40 100644 --- a/agent/cache-types/catalog_datacenters.go +++ b/agent/cache-types/catalog_datacenters.go @@ -12,6 +12,7 @@ const CatalogDatacentersName = "catalog-datacenters" // Datacenters supports fetching discovering all the known datacenters type CatalogDatacenters struct { + RegisterOptionsNoRefresh RPC RPC } @@ -75,7 +76,3 @@ func (c *CatalogDatacenters) Fetch(opts cache.FetchOptions, req cache.Request) ( return result, nil } - -func (c *CatalogDatacenters) SupportsBlocking() bool { - return false -} diff --git a/agent/cache-types/catalog_list_services.go b/agent/cache-types/catalog_list_services.go index 8434d56fc1..7d38f23ae3 100644 --- a/agent/cache-types/catalog_list_services.go +++ b/agent/cache-types/catalog_list_services.go @@ -12,6 +12,7 @@ const CatalogListServicesName = "catalog-list-services" // CatalogListServices supports fetching discovering service names via the catalog. type CatalogListServices struct { + RegisterOptionsBlockingRefresh RPC RPC } @@ -49,7 +50,3 @@ func (c *CatalogListServices) Fetch(opts cache.FetchOptions, req cache.Request) result.Index = reply.QueryMeta.Index return result, nil } - -func (c *CatalogListServices) SupportsBlocking() bool { - return true -} diff --git a/agent/cache-types/catalog_service_list.go b/agent/cache-types/catalog_service_list.go index 3bcfcd3714..aacdf3e2e2 100644 --- a/agent/cache-types/catalog_service_list.go +++ b/agent/cache-types/catalog_service_list.go @@ -12,6 +12,7 @@ const CatalogServiceListName = "catalog-services-list" // CatalogServiceList supports fetching service names via the catalog. type CatalogServiceList struct { + RegisterOptionsBlockingRefresh RPC RPC } @@ -49,7 +50,3 @@ func (c *CatalogServiceList) Fetch(opts cache.FetchOptions, req cache.Request) ( result.Index = reply.QueryMeta.Index return result, nil } - -func (c *CatalogServiceList) SupportsBlocking() bool { - return true -} diff --git a/agent/cache-types/catalog_services.go b/agent/cache-types/catalog_services.go index f317491bab..43559de423 100644 --- a/agent/cache-types/catalog_services.go +++ b/agent/cache-types/catalog_services.go @@ -13,6 +13,7 @@ const CatalogServicesName = "catalog-services" // CatalogServices supports fetching discovering service instances via the // catalog. type CatalogServices struct { + RegisterOptionsBlockingRefresh RPC RPC } @@ -50,7 +51,3 @@ func (c *CatalogServices) Fetch(opts cache.FetchOptions, req cache.Request) (cac result.Index = reply.QueryMeta.Index return result, nil } - -func (c *CatalogServices) SupportsBlocking() bool { - return true -} diff --git a/agent/cache-types/config_entry.go b/agent/cache-types/config_entry.go index d62f9a8a65..b1787a93a2 100644 --- a/agent/cache-types/config_entry.go +++ b/agent/cache-types/config_entry.go @@ -12,6 +12,7 @@ const ConfigEntriesName = "config-entries" // ConfigEntries supports fetching discovering configuration entries type ConfigEntries struct { + RegisterOptionsBlockingRefresh RPC RPC } @@ -49,7 +50,3 @@ func (c *ConfigEntries) Fetch(opts cache.FetchOptions, req cache.Request) (cache result.Index = reply.QueryMeta.Index return result, nil } - -func (c *ConfigEntries) SupportsBlocking() bool { - return true -} diff --git a/agent/cache-types/connect_ca_leaf.go b/agent/cache-types/connect_ca_leaf.go index fd09919dfb..009c6ce80e 100644 --- a/agent/cache-types/connect_ca_leaf.go +++ b/agent/cache-types/connect_ca_leaf.go @@ -50,6 +50,7 @@ const caChangeJitterWindow = 30 * time.Second // ConnectCALeaf supports fetching and generating Connect leaf // certificates. type ConnectCALeaf struct { + RegisterOptionsBlockingRefresh caIndex uint64 // Current index for CA roots // rootWatchMu protects access to the rootWatchSubscribers map and @@ -629,10 +630,6 @@ func (c *ConnectCALeaf) generateNewLeaf(req *ConnectCALeafRequest, return result, nil } -func (c *ConnectCALeaf) SupportsBlocking() bool { - return true -} - // ConnectCALeafRequest is the cache.Request implementation for the // ConnectCALeaf cache type. This is implemented here and not in structs // since this is only used for cache-related requests and not forwarded diff --git a/agent/cache-types/connect_ca_leaf_test.go b/agent/cache-types/connect_ca_leaf_test.go index ec2dd6e905..bc3b6b3ee0 100644 --- a/agent/cache-types/connect_ca_leaf_test.go +++ b/agent/cache-types/connect_ca_leaf_test.go @@ -966,6 +966,21 @@ func TestConnectCALeaf_expiringLeaf(t *testing.T) { } } +// testConnectCaRoot wraps ConnectCARoot to disable refresh so that the gated +// channel controls the request directly. Otherwise, we get background refreshes and +// it screws up the ordering of the channel reads of the testGatedRootsRPC +// implementation. +type testConnectCaRoot struct { + ConnectCARoot +} + +func (r testConnectCaRoot) RegisterOptions() cache.RegisterOptions { + return cache.RegisterOptions{ + Refresh: false, + SupportsBlocking: true, + } +} + // testCALeafType returns a *ConnectCALeaf that is pre-configured to // use the given RPC implementation for "ConnectCA.Sign" operations. func testCALeafType(t *testing.T, rpc RPC) (*ConnectCALeaf, chan structs.IndexedCARoots) { @@ -977,14 +992,9 @@ func testCALeafType(t *testing.T, rpc RPC) (*ConnectCALeaf, chan structs.Indexed // Create a cache c := cache.TestCache(t) - c.RegisterType(ConnectCARootName, &ConnectCARoot{RPC: rootsRPC}, &cache.RegisterOptions{ - // Disable refresh so that the gated channel controls the - // request directly. Otherwise, we get background refreshes and - // it screws up the ordering of the channel reads of the - // testGatedRootsRPC implementation. - Refresh: false, + c.RegisterType(ConnectCARootName, &testConnectCaRoot{ + ConnectCARoot: ConnectCARoot{RPC: rootsRPC}, }) - // Create the leaf type return &ConnectCALeaf{ RPC: rpc, diff --git a/agent/cache-types/connect_ca_root.go b/agent/cache-types/connect_ca_root.go index aa48245a50..e4f3816f88 100644 --- a/agent/cache-types/connect_ca_root.go +++ b/agent/cache-types/connect_ca_root.go @@ -14,6 +14,7 @@ const ConnectCARootName = "connect-ca-root" // straightforward cache type since it only has to block on the given // index and return the data. type ConnectCARoot struct { + RegisterOptionsBlockingRefresh RPC RPC } @@ -45,7 +46,3 @@ func (c *ConnectCARoot) Fetch(opts cache.FetchOptions, req cache.Request) (cache result.Index = reply.QueryMeta.Index return result, nil } - -func (c *ConnectCARoot) SupportsBlocking() bool { - return true -} diff --git a/agent/cache-types/discovery_chain.go b/agent/cache-types/discovery_chain.go index 60e386deb3..5dd6eaed2e 100644 --- a/agent/cache-types/discovery_chain.go +++ b/agent/cache-types/discovery_chain.go @@ -13,6 +13,7 @@ const CompiledDiscoveryChainName = "compiled-discovery-chain" // CompiledDiscoveryChain supports fetching the complete discovery chain for a // service and caching its compilation. type CompiledDiscoveryChain struct { + RegisterOptionsBlockingRefresh RPC RPC } @@ -50,7 +51,3 @@ func (c *CompiledDiscoveryChain) Fetch(opts cache.FetchOptions, req cache.Reques result.Index = reply.QueryMeta.Index return result, nil } - -func (c *CompiledDiscoveryChain) SupportsBlocking() bool { - return true -} diff --git a/agent/cache-types/federation_state_list_gateways.go b/agent/cache-types/federation_state_list_gateways.go index f2f8892eda..83df66978e 100644 --- a/agent/cache-types/federation_state_list_gateways.go +++ b/agent/cache-types/federation_state_list_gateways.go @@ -12,6 +12,7 @@ const FederationStateListMeshGatewaysName = "federation-state-list-mesh-gateways // FederationState supports fetching federation states. type FederationStateListMeshGateways struct { + RegisterOptionsBlockingRefresh RPC RPC } @@ -49,7 +50,3 @@ func (c *FederationStateListMeshGateways) Fetch(opts cache.FetchOptions, req cac result.Index = reply.QueryMeta.Index return result, nil } - -func (c *FederationStateListMeshGateways) SupportsBlocking() bool { - return true -} diff --git a/agent/cache-types/gateway_services.go b/agent/cache-types/gateway_services.go index 30da6a1c95..96d7575baf 100644 --- a/agent/cache-types/gateway_services.go +++ b/agent/cache-types/gateway_services.go @@ -12,6 +12,7 @@ const GatewayServicesName = "gateway-services" // GatewayUpstreams supports fetching upstreams for a given gateway name. type GatewayServices struct { + RegisterOptionsBlockingRefresh RPC RPC } @@ -49,7 +50,3 @@ func (g *GatewayServices) Fetch(opts cache.FetchOptions, req cache.Request) (cac result.Index = reply.QueryMeta.Index return result, nil } - -func (g *GatewayServices) SupportsBlocking() bool { - return true -} diff --git a/agent/cache-types/health_services.go b/agent/cache-types/health_services.go index b3ccf8391f..d73b952090 100644 --- a/agent/cache-types/health_services.go +++ b/agent/cache-types/health_services.go @@ -13,6 +13,7 @@ const HealthServicesName = "health-services" // HealthServices supports fetching discovering service instances via the // catalog. type HealthServices struct { + RegisterOptionsBlockingRefresh RPC RPC } @@ -50,7 +51,3 @@ func (c *HealthServices) Fetch(opts cache.FetchOptions, req cache.Request) (cach result.Index = reply.QueryMeta.Index return result, nil } - -func (c *HealthServices) SupportsBlocking() bool { - return true -} diff --git a/agent/cache-types/intention_match.go b/agent/cache-types/intention_match.go index 0b960f0de4..688adfe438 100644 --- a/agent/cache-types/intention_match.go +++ b/agent/cache-types/intention_match.go @@ -12,6 +12,7 @@ const IntentionMatchName = "intention-match" // IntentionMatch supports fetching the intentions via match queries. type IntentionMatch struct { + RegisterOptionsBlockingRefresh RPC RPC } @@ -43,7 +44,3 @@ func (c *IntentionMatch) Fetch(opts cache.FetchOptions, req cache.Request) (cach result.Index = reply.Index return result, nil } - -func (c *IntentionMatch) SupportsBlocking() bool { - return true -} diff --git a/agent/cache-types/node_services.go b/agent/cache-types/node_services.go index 53ddd03bea..9ead9ba436 100644 --- a/agent/cache-types/node_services.go +++ b/agent/cache-types/node_services.go @@ -13,6 +13,7 @@ const NodeServicesName = "node-services" // NodeServices supports fetching discovering service instances via the // catalog. type NodeServices struct { + RegisterOptionsBlockingRefresh RPC RPC } @@ -50,7 +51,3 @@ func (c *NodeServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache. result.Index = reply.QueryMeta.Index return result, nil } - -func (c *NodeServices) SupportsBlocking() bool { - return true -} diff --git a/agent/cache-types/options.go b/agent/cache-types/options.go new file mode 100644 index 0000000000..6864258e8e --- /dev/null +++ b/agent/cache-types/options.go @@ -0,0 +1,36 @@ +package cachetype + +import ( + "time" + + "github.com/hashicorp/consul/agent/cache" +) + +// RegisterOptionsBlockingRefresh can be embedded into a struct to implement +// part of the agent/cache.Type interface. +// When embedded into a struct it identifies the cache type as one which +// supports blocking, and uses refresh to keep the cache fresh. +type RegisterOptionsBlockingRefresh struct{} + +func (r RegisterOptionsBlockingRefresh) RegisterOptions() cache.RegisterOptions { + return cache.RegisterOptions{ + // Maintain a blocking query, retry dropped connections quickly + Refresh: true, + SupportsBlocking: true, + RefreshTimer: 0 * time.Second, + RefreshTimeout: 10 * time.Minute, + } +} + +// RegisterOptionsNoRefresh can be embedded into a struct to implement +// part of the agent/cache.Type interface. +// When embedded into a struct it identifies the cache type as one which +// does not support blocking, and should not be refreshed. +type RegisterOptionsNoRefresh struct{} + +func (r RegisterOptionsNoRefresh) RegisterOptions() cache.RegisterOptions { + return cache.RegisterOptions{ + Refresh: false, + SupportsBlocking: false, + } +} diff --git a/agent/cache-types/prepared_query.go b/agent/cache-types/prepared_query.go index d7ec601ac5..3592bc2106 100644 --- a/agent/cache-types/prepared_query.go +++ b/agent/cache-types/prepared_query.go @@ -13,10 +13,11 @@ const PreparedQueryName = "prepared-query" // PreparedQuery supports fetching discovering service instances via prepared // queries. type PreparedQuery struct { + RegisterOptionsNoRefresh RPC RPC } -func (c *PreparedQuery) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { +func (c *PreparedQuery) Fetch(_ cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { var result cache.FetchResult // The request should be a PreparedQueryExecuteRequest. @@ -47,8 +48,3 @@ func (c *PreparedQuery) Fetch(opts cache.FetchOptions, req cache.Request) (cache return result, nil } - -func (c *PreparedQuery) SupportsBlocking() bool { - // Prepared queries don't support blocking. - return false -} diff --git a/agent/cache-types/resolved_service_config.go b/agent/cache-types/resolved_service_config.go index a78c0ce812..7c17e06186 100644 --- a/agent/cache-types/resolved_service_config.go +++ b/agent/cache-types/resolved_service_config.go @@ -13,6 +13,7 @@ const ResolvedServiceConfigName = "resolved-service-config" // ResolvedServiceConfig supports fetching the config for a service resolved from // the global proxy defaults and the centrally registered service config. type ResolvedServiceConfig struct { + RegisterOptionsBlockingRefresh RPC RPC } @@ -50,7 +51,3 @@ func (c *ResolvedServiceConfig) Fetch(opts cache.FetchOptions, req cache.Request result.Index = reply.QueryMeta.Index return result, nil } - -func (c *ResolvedServiceConfig) SupportsBlocking() bool { - return true -} diff --git a/agent/cache-types/service_checks.go b/agent/cache-types/service_checks.go index 6f7af528ff..bb346390c3 100644 --- a/agent/cache-types/service_checks.go +++ b/agent/cache-types/service_checks.go @@ -24,6 +24,7 @@ type Agent interface { // ServiceHTTPBasedChecks supports fetching discovering checks in the local state type ServiceHTTPChecks struct { + RegisterOptionsBlockingRefresh Agent Agent } @@ -91,10 +92,6 @@ func (c *ServiceHTTPChecks) Fetch(opts cache.FetchOptions, req cache.Request) (c return result, nil } -func (c *ServiceHTTPChecks) SupportsBlocking() bool { - return true -} - // ServiceHTTPChecksRequest is the cache.Request implementation for the // ServiceHTTPBasedChecks cache type. This is implemented here and not in structs // since this is only used for cache-related requests and not forwarded diff --git a/agent/cache-types/service_dump.go b/agent/cache-types/service_dump.go index d4653ee2d0..6bb633b77f 100644 --- a/agent/cache-types/service_dump.go +++ b/agent/cache-types/service_dump.go @@ -12,6 +12,7 @@ const InternalServiceDumpName = "service-dump" // InternalServiceDump supports fetching discovering service names via the catalog. type InternalServiceDump struct { + RegisterOptionsBlockingRefresh RPC RPC } @@ -49,7 +50,3 @@ func (c *InternalServiceDump) Fetch(opts cache.FetchOptions, req cache.Request) result.Index = reply.QueryMeta.Index return result, nil } - -func (c *InternalServiceDump) SupportsBlocking() bool { - return true -} diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 66146610f2..3603c31514 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -160,6 +160,12 @@ type RegisterOptions struct { // is to only request data on explicit Get. Refresh bool + // SupportsBlocking should be set to true if the type supports blocking queries. + // Types that do not support blocking queries will not be able to use + // background refresh nor will the cache attempt blocking fetches if the + // client requests them with MinIndex. + SupportsBlocking bool + // RefreshTimer is the time between attempting to refresh data. // If this is zero, then data is refreshed immediately when a fetch // is returned. @@ -185,17 +191,15 @@ type RegisterOptions struct { // // This makes the type available for Get but does not automatically perform // any prefetching. In order to populate the cache, Get must be called. -func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) { - if opts == nil { - opts = &RegisterOptions{} - } +func (c *Cache) RegisterType(n string, typ Type) { + opts := typ.RegisterOptions() if opts.LastGetTTL == 0 { opts.LastGetTTL = 72 * time.Hour // reasonable default is days } c.typesLock.Lock() defer c.typesLock.Unlock() - c.types[n] = typeEntry{Name: n, Type: typ, Opts: opts} + c.types[n] = typeEntry{Name: n, Type: typ, Opts: &opts} } // Get loads the data for the given type and request. If data satisfying the @@ -241,7 +245,7 @@ func (c *Cache) getEntryLocked( // Check index is not specified or lower than value, or the type doesn't // support blocking. - if tEntry.Type.SupportsBlocking() && minIndex > 0 && minIndex >= entry.Index { + if tEntry.Opts.SupportsBlocking && minIndex > 0 && minIndex >= entry.Index { // MinIndex was given and matches or is higher than current value so we // ignore the cache and fallthrough to blocking on a new value below. return true, false, entry @@ -465,7 +469,7 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at } fOpts := FetchOptions{} - if tEntry.Type.SupportsBlocking() { + if tEntry.Opts.SupportsBlocking { fOpts.MinIndex = entry.Index fOpts.Timeout = tEntry.Opts.RefreshTimeout } diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index 8d74558399..32cc2dc417 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -23,7 +23,7 @@ func TestCacheGet_noIndex(t *testing.T) { typ := TestType(t) defer typ.AssertExpectations(t) c := TestCache(t) - c.RegisterType("t", typ, nil) + c.RegisterType("t", typ) // Configure the type typ.Static(FetchResult{Value: 42}, nil).Times(1) @@ -56,7 +56,7 @@ func TestCacheGet_initError(t *testing.T) { typ := TestType(t) defer typ.AssertExpectations(t) c := TestCache(t) - c.RegisterType("t", typ, nil) + c.RegisterType("t", typ) // Configure the type fetcherr := fmt.Errorf("error") @@ -91,7 +91,7 @@ func TestCacheGet_cachedErrorsDontStick(t *testing.T) { typ := TestType(t) defer typ.AssertExpectations(t) c := TestCache(t) - c.RegisterType("t", typ, nil) + c.RegisterType("t", typ) // Configure the type fetcherr := fmt.Errorf("initial error") @@ -152,7 +152,7 @@ func TestCacheGet_blankCacheKey(t *testing.T) { typ := TestType(t) defer typ.AssertExpectations(t) c := TestCache(t) - c.RegisterType("t", typ, nil) + c.RegisterType("t", typ) // Configure the type typ.Static(FetchResult{Value: 42}, nil).Times(2) @@ -183,7 +183,7 @@ func TestCacheGet_blockingInitSameKey(t *testing.T) { typ := TestType(t) defer typ.AssertExpectations(t) c := TestCache(t) - c.RegisterType("t", typ, nil) + c.RegisterType("t", typ) // Configure the type triggerCh := make(chan time.Time) @@ -220,7 +220,7 @@ func TestCacheGet_blockingInitDiffKeys(t *testing.T) { typ := TestType(t) defer typ.AssertExpectations(t) c := TestCache(t) - c.RegisterType("t", typ, nil) + c.RegisterType("t", typ) // Keep track of the keys var keysLock sync.Mutex @@ -270,7 +270,7 @@ func TestCacheGet_blockingIndex(t *testing.T) { typ := TestType(t) defer typ.AssertExpectations(t) c := TestCache(t) - c.RegisterType("t", typ, nil) + c.RegisterType("t", typ) // Configure the type triggerCh := make(chan time.Time) @@ -304,7 +304,7 @@ func TestCacheGet_blockingIndexTimeout(t *testing.T) { typ := TestType(t) defer typ.AssertExpectations(t) c := TestCache(t) - c.RegisterType("t", typ, nil) + c.RegisterType("t", typ) // Configure the type triggerCh := make(chan time.Time) @@ -340,7 +340,7 @@ func TestCacheGet_blockingIndexError(t *testing.T) { typ := TestType(t) defer typ.AssertExpectations(t) c := TestCache(t) - c.RegisterType("t", typ, nil) + c.RegisterType("t", typ) // Configure the type var retries uint32 @@ -377,7 +377,7 @@ func TestCacheGet_emptyFetchResult(t *testing.T) { typ := TestType(t) defer typ.AssertExpectations(t) c := TestCache(t) - c.RegisterType("t", typ, nil) + c.RegisterType("t", typ) stateCh := make(chan int, 1) @@ -440,14 +440,15 @@ func TestCacheGet_emptyFetchResult(t *testing.T) { func TestCacheGet_periodicRefresh(t *testing.T) { t.Parallel() - typ := TestType(t) - defer typ.AssertExpectations(t) - c := TestCache(t) - c.RegisterType("t", typ, &RegisterOptions{ + typ := &MockType{} + typ.On("RegisterOptions").Return(RegisterOptions{ Refresh: true, RefreshTimer: 100 * time.Millisecond, RefreshTimeout: 5 * time.Minute, }) + defer typ.AssertExpectations(t) + c := TestCache(t) + c.RegisterType("t", typ) // This is a bit weird, but we do this to ensure that the final // call to the Fetch (if it happens, depends on timing) just blocks. @@ -479,14 +480,15 @@ func TestCacheGet_periodicRefresh(t *testing.T) { func TestCacheGet_periodicRefreshMultiple(t *testing.T) { t.Parallel() - typ := TestType(t) - defer typ.AssertExpectations(t) - c := TestCache(t) - c.RegisterType("t", typ, &RegisterOptions{ + typ := &MockType{} + typ.On("RegisterOptions").Return(RegisterOptions{ Refresh: true, RefreshTimer: 0 * time.Millisecond, RefreshTimeout: 5 * time.Minute, }) + defer typ.AssertExpectations(t) + c := TestCache(t) + c.RegisterType("t", typ) // This is a bit weird, but we do this to ensure that the final // call to the Fetch (if it happens, depends on timing) just blocks. @@ -527,14 +529,15 @@ func TestCacheGet_periodicRefreshMultiple(t *testing.T) { func TestCacheGet_periodicRefreshErrorBackoff(t *testing.T) { t.Parallel() - typ := TestType(t) - defer typ.AssertExpectations(t) - c := TestCache(t) - c.RegisterType("t", typ, &RegisterOptions{ + typ := &MockType{} + typ.On("RegisterOptions").Return(RegisterOptions{ Refresh: true, RefreshTimer: 0, RefreshTimeout: 5 * time.Minute, }) + defer typ.AssertExpectations(t) + c := TestCache(t) + c.RegisterType("t", typ) // Configure the type var retries uint32 @@ -568,14 +571,15 @@ func TestCacheGet_periodicRefreshErrorBackoff(t *testing.T) { func TestCacheGet_periodicRefreshBadRPCZeroIndexErrorBackoff(t *testing.T) { t.Parallel() - typ := TestType(t) - defer typ.AssertExpectations(t) - c := TestCache(t) - c.RegisterType("t", typ, &RegisterOptions{ + typ := &MockType{} + typ.On("RegisterOptions").Return(RegisterOptions{ Refresh: true, RefreshTimer: 0, RefreshTimeout: 5 * time.Minute, }) + defer typ.AssertExpectations(t) + c := TestCache(t) + c.RegisterType("t", typ) // Configure the type var retries uint32 @@ -611,14 +615,16 @@ func TestCacheGet_periodicRefreshBadRPCZeroIndexErrorBackoff(t *testing.T) { func TestCacheGet_noIndexSetsOne(t *testing.T) { t.Parallel() - typ := TestType(t) + typ := &MockType{} + typ.On("RegisterOptions").Return(RegisterOptions{ + SupportsBlocking: true, + Refresh: true, + RefreshTimer: 0, + RefreshTimeout: 5 * time.Minute, + }) defer typ.AssertExpectations(t) c := TestCache(t) - c.RegisterType("t", typ, &RegisterOptions{ - Refresh: true, - RefreshTimer: 0, - RefreshTimeout: 5 * time.Minute, - }) + c.RegisterType("t", typ) // Simulate "well behaved" RPC with no data yet but returning 1 { @@ -671,15 +677,17 @@ func TestCacheGet_fetchTimeout(t *testing.T) { require := require.New(t) - typ := TestType(t) + typ := &MockType{} + timeout := 10 * time.Minute + typ.On("RegisterOptions").Return(RegisterOptions{ + RefreshTimeout: timeout, + SupportsBlocking: true, + }) defer typ.AssertExpectations(t) c := TestCache(t) // Register the type with a timeout - timeout := 10 * time.Minute - c.RegisterType("t", typ, &RegisterOptions{ - RefreshTimeout: timeout, - }) + c.RegisterType("t", typ) // Configure the type var actual time.Duration @@ -705,14 +713,15 @@ func TestCacheGet_expire(t *testing.T) { require := require.New(t) - typ := TestType(t) + typ := &MockType{} + typ.On("RegisterOptions").Return(RegisterOptions{ + LastGetTTL: 400 * time.Millisecond, + }) defer typ.AssertExpectations(t) c := TestCache(t) // Register the type with a timeout - c.RegisterType("t", typ, &RegisterOptions{ - LastGetTTL: 400 * time.Millisecond, - }) + c.RegisterType("t", typ) // Configure the type typ.Static(FetchResult{Value: 42}, nil).Times(2) @@ -760,14 +769,15 @@ func TestCacheGet_expireResetGet(t *testing.T) { require := require.New(t) - typ := TestType(t) + typ := &MockType{} + typ.On("RegisterOptions").Return(RegisterOptions{ + LastGetTTL: 150 * time.Millisecond, + }) defer typ.AssertExpectations(t) c := TestCache(t) // Register the type with a timeout - c.RegisterType("t", typ, &RegisterOptions{ - LastGetTTL: 150 * time.Millisecond, - }) + c.RegisterType("t", typ) // Configure the type typ.Static(FetchResult{Value: 42}, nil).Times(2) @@ -821,8 +831,8 @@ func TestCacheGet_duplicateKeyDifferentType(t *testing.T) { defer typ2.AssertExpectations(t) c := TestCache(t) - c.RegisterType("t", typ, nil) - c.RegisterType("t2", typ2, nil) + c.RegisterType("t", typ) + c.RegisterType("t2", typ2) // Configure the types typ.Static(FetchResult{Value: 100}, nil) @@ -863,7 +873,7 @@ func TestCacheGet_partitionDC(t *testing.T) { t.Parallel() c := TestCache(t) - c.RegisterType("t", &testPartitionType{}, nil) + c.RegisterType("t", &testPartitionType{}) // Perform multiple gets getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{ @@ -882,7 +892,7 @@ func TestCacheGet_partitionToken(t *testing.T) { t.Parallel() c := TestCache(t) - c.RegisterType("t", &testPartitionType{}, nil) + c.RegisterType("t", &testPartitionType{}) // Perform multiple gets getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{ @@ -907,8 +917,10 @@ func (t *testPartitionType) Fetch(opts FetchOptions, r Request) (FetchResult, er }, nil } -func (t *testPartitionType) SupportsBlocking() bool { - return true +func (t *testPartitionType) RegisterOptions() RegisterOptions { + return RegisterOptions{ + SupportsBlocking: true, + } } // Test that background refreshing reports correct Age in failure and happy @@ -918,14 +930,15 @@ func TestCacheGet_refreshAge(t *testing.T) { require := require.New(t) - typ := TestType(t) - defer typ.AssertExpectations(t) - c := TestCache(t) - c.RegisterType("t", typ, &RegisterOptions{ + typ := &MockType{} + typ.On("RegisterOptions").Return(RegisterOptions{ Refresh: true, RefreshTimer: 0, RefreshTimeout: 5 * time.Minute, }) + defer typ.AssertExpectations(t) + c := TestCache(t) + c.RegisterType("t", typ) // Configure the type var index, shouldFail uint64 @@ -1035,13 +1048,14 @@ func TestCacheGet_nonRefreshAge(t *testing.T) { require := require.New(t) - typ := TestType(t) - defer typ.AssertExpectations(t) - c := TestCache(t) - c.RegisterType("t", typ, &RegisterOptions{ + typ := &MockType{} + typ.On("RegisterOptions").Return(RegisterOptions{ Refresh: false, LastGetTTL: 100 * time.Millisecond, }) + defer typ.AssertExpectations(t) + c := TestCache(t) + c.RegisterType("t", typ) // Configure the type var index uint64 @@ -1121,7 +1135,7 @@ func TestCacheGet_nonBlockingType(t *testing.T) { typ := TestTypeNonBlocking(t) c := TestCache(t) - c.RegisterType("t", typ, nil) + c.RegisterType("t", typ) // Configure the type typ.Static(FetchResult{Value: 42, Index: 1}, nil).Once() diff --git a/agent/cache/mock_Request.go b/agent/cache/mock_Request.go index e3abd15159..c5af589241 100644 --- a/agent/cache/mock_Request.go +++ b/agent/cache/mock_Request.go @@ -1,4 +1,5 @@ -// Code generated by mockery v1.0.0 +// Code generated by mockery v1.0.0. DO NOT EDIT. + package cache import mock "github.com/stretchr/testify/mock" diff --git a/agent/cache/mock_Type.go b/agent/cache/mock_Type.go index 64764447e3..7c39fca329 100644 --- a/agent/cache/mock_Type.go +++ b/agent/cache/mock_Type.go @@ -1,4 +1,5 @@ // Code generated by mockery v1.0.0. DO NOT EDIT. + package cache import mock "github.com/stretchr/testify/mock" @@ -29,15 +30,15 @@ func (_m *MockType) Fetch(_a0 FetchOptions, _a1 Request) (FetchResult, error) { return r0, r1 } -// SupportsBlocking provides a mock function with given fields: -func (_m *MockType) SupportsBlocking() bool { +// RegisterOptions provides a mock function with given fields: +func (_m *MockType) RegisterOptions() RegisterOptions { ret := _m.Called() - var r0 bool - if rf, ok := ret.Get(0).(func() bool); ok { + var r0 RegisterOptions + if rf, ok := ret.Get(0).(func() RegisterOptions); ok { r0 = rf() } else { - r0 = ret.Get(0).(bool) + r0 = ret.Get(0).(RegisterOptions) } return r0 diff --git a/agent/cache/testing.go b/agent/cache/testing.go index 10acc14dfd..47180dc3a6 100644 --- a/agent/cache/testing.go +++ b/agent/cache/testing.go @@ -96,20 +96,21 @@ func TestRequest(t testing.T, info RequestInfo) *MockRequest { return req } -// TestType returns a MockType that can be used to setup expectations -// on data fetching. +// TestType returns a MockType that sets default RegisterOptions. func TestType(t testing.T) *MockType { - return testTypeInternal(t, true) + typ := &MockType{} + typ.On("RegisterOptions").Return(RegisterOptions{ + SupportsBlocking: true, + }) + return typ } // TestTypeNonBlocking returns a MockType that returns false to SupportsBlocking. func TestTypeNonBlocking(t testing.T) *MockType { - return testTypeInternal(t, false) -} - -func testTypeInternal(t testing.T, enableBlocking bool) *MockType { typ := &MockType{} - typ.On("SupportsBlocking").Return(enableBlocking).Maybe() + typ.On("RegisterOptions").Return(RegisterOptions{ + SupportsBlocking: false, + }) return typ } diff --git a/agent/cache/type.go b/agent/cache/type.go index a95b9bb3f0..deae8a573c 100644 --- a/agent/cache/type.go +++ b/agent/cache/type.go @@ -30,11 +30,9 @@ type Type interface { // metadata even when there is no result. Fetch(FetchOptions, Request) (FetchResult, error) - // SupportsBlocking should return true if the type supports blocking queries. - // Types that do not support blocking queries will not be able to use - // background refresh nor will the cache attempt blocking fetches if the - // client requests them with MinIndex. - SupportsBlocking() bool + // RegisterOptions are used when the type is registered to configure the + // behaviour of cache entries for this type. + RegisterOptions() RegisterOptions } // FetchOptions are various settable options when a Fetch is called. diff --git a/agent/cache/watch.go b/agent/cache/watch.go index ed22a70d39..1ed3ccad13 100644 --- a/agent/cache/watch.go +++ b/agent/cache/watch.go @@ -64,7 +64,7 @@ func (c *Cache) Notify( return fmt.Errorf("unknown type in cache: %s", t) } - if tEntry.Type.SupportsBlocking() { + if tEntry.Opts.SupportsBlocking { go c.notifyBlockingQuery(ctx, tEntry, r, correlationID, ch) return nil } diff --git a/agent/cache/watch_test.go b/agent/cache/watch_test.go index 9e65abf19f..bbf73503fd 100644 --- a/agent/cache/watch_test.go +++ b/agent/cache/watch_test.go @@ -17,11 +17,10 @@ func TestCacheNotify(t *testing.T) { t.Parallel() typ := TestType(t) + typ.On("RegisterOptions").Return(RegisterOptions{}) defer typ.AssertExpectations(t) c := TestCache(t) - c.RegisterType("t", typ, &RegisterOptions{ - Refresh: false, - }) + c.RegisterType("t", typ) // Setup triggers to control when "updates" should be delivered trigger := make([]chan time.Time, 5) @@ -167,9 +166,7 @@ func TestCacheNotifyPolling(t *testing.T) { typ := TestTypeNonBlocking(t) defer typ.AssertExpectations(t) c := TestCache(t) - c.RegisterType("t", typ, &RegisterOptions{ - Refresh: false, - }) + c.RegisterType("t", typ) // Configure the type typ.Static(FetchResult{Value: 1, Index: 1}, nil).Once().Run(func(args mock.Arguments) { @@ -280,11 +277,10 @@ func TestCacheWatch_ErrorBackoff(t *testing.T) { t.Parallel() typ := TestType(t) + typ.On("RegisterOptions").Return(RegisterOptions{}) defer typ.AssertExpectations(t) c := TestCache(t) - c.RegisterType("t", typ, &RegisterOptions{ - Refresh: false, - }) + c.RegisterType("t", typ) // Configure the type var retries uint32 @@ -345,9 +341,7 @@ func TestCacheWatch_ErrorBackoffNonBlocking(t *testing.T) { typ := TestTypeNonBlocking(t) defer typ.AssertExpectations(t) c := TestCache(t) - c.RegisterType("t", typ, &RegisterOptions{ - Refresh: false, - }) + c.RegisterType("t", typ) // Configure the type var retries uint32 diff --git a/agent/proxycfg/testing.go b/agent/proxycfg/testing.go index 2b08301ea2..aa0b815ccd 100644 --- a/agent/proxycfg/testing.go +++ b/agent/proxycfg/testing.go @@ -50,35 +50,13 @@ func NewTestCacheTypes(t testing.T) *TestCacheTypes { // proxycfg will watch suitable for testing a proxycfg.State or Manager. func TestCacheWithTypes(t testing.T, types *TestCacheTypes) *cache.Cache { c := cache.TestCache(t) - c.RegisterType(cachetype.ConnectCARootName, types.roots, &cache.RegisterOptions{ - Refresh: true, - RefreshTimer: 0, - RefreshTimeout: 10 * time.Minute, - }) - c.RegisterType(cachetype.ConnectCALeafName, types.leaf, &cache.RegisterOptions{ - Refresh: true, - RefreshTimer: 0, - RefreshTimeout: 10 * time.Minute, - }) - c.RegisterType(cachetype.IntentionMatchName, types.intentions, &cache.RegisterOptions{ - Refresh: true, - RefreshTimer: 0, - RefreshTimeout: 10 * time.Minute, - }) - c.RegisterType(cachetype.HealthServicesName, types.health, &cache.RegisterOptions{ - Refresh: true, - RefreshTimer: 0, - RefreshTimeout: 10 * time.Minute, - }) - c.RegisterType(cachetype.PreparedQueryName, types.query, &cache.RegisterOptions{ - Refresh: false, - }) - c.RegisterType(cachetype.CompiledDiscoveryChainName, types.compiledChain, &cache.RegisterOptions{ - Refresh: true, - RefreshTimer: 0, - RefreshTimeout: 10 * time.Minute, - }) - c.RegisterType(cachetype.ServiceHTTPChecksName, types.serviceHTTPChecks, &cache.RegisterOptions{}) + c.RegisterType(cachetype.ConnectCARootName, types.roots) + c.RegisterType(cachetype.ConnectCALeafName, types.leaf) + c.RegisterType(cachetype.IntentionMatchName, types.intentions) + c.RegisterType(cachetype.HealthServicesName, types.health) + c.RegisterType(cachetype.PreparedQueryName, types.query) + c.RegisterType(cachetype.CompiledDiscoveryChainName, types.compiledChain) + c.RegisterType(cachetype.ServiceHTTPChecksName, types.serviceHTTPChecks) return c } @@ -1221,7 +1199,10 @@ func (ct *ControllableCacheType) Fetch(opts cache.FetchOptions, req cache.Reques }, nil } -// SupportsBlocking implements cache.Type -func (ct *ControllableCacheType) SupportsBlocking() bool { - return ct.blocking +func (ct *ControllableCacheType) RegisterOptions() cache.RegisterOptions { + return cache.RegisterOptions{ + Refresh: ct.blocking, + SupportsBlocking: ct.blocking, + RefreshTimeout: 10 * time.Minute, + } }