diff --git a/agent/agent.go b/agent/agent.go index 86917c53d8..44dfa3d329 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -626,8 +626,8 @@ func (a *Agent) Start(ctx context.Context) error { // Start the proxy config manager. a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{ - Cache: a.cache, - Health: a.rpcClientHealth, + Cache: &proxycfg.CacheWrapper{Cache: a.cache}, + Health: &proxycfg.HealthWrapper{Health: a.rpcClientHealth}, Logger: a.logger.Named(logging.ProxyConfig), State: a.State, Tokens: a.baseDeps.Tokens, diff --git a/agent/cache/watch.go b/agent/cache/watch.go index f50e280783..f99f85c04b 100644 --- a/agent/cache/watch.go +++ b/agent/cache/watch.go @@ -23,6 +23,9 @@ type UpdateEvent struct { Err error } +// Callback is the function type accepted by NotifyCallback. +type Callback func(ctx context.Context, event UpdateEvent) + // Notify registers a desire to be updated about changes to a cache result. // // It is a helper that abstracts code from performing their own "blocking" query @@ -56,6 +59,24 @@ func (c *Cache) Notify( r Request, correlationID string, ch chan<- UpdateEvent, +) error { + return c.NotifyCallback(ctx, t, r, correlationID, func(ctx context.Context, event UpdateEvent) { + select { + case ch <- event: + case <-ctx.Done(): + } + }) +} + +// NotifyCallback allows you to receive notifications about changes to a cache +// result in the same way as Notify, but accepts a callback function instead of +// a channel. +func (c *Cache) NotifyCallback( + ctx context.Context, + t string, + r Request, + correlationID string, + cb Callback, ) error { c.typesLock.RLock() tEntry, ok := c.types[t] @@ -65,7 +86,7 @@ func (c *Cache) Notify( } if tEntry.Opts.SupportsBlocking { - go c.notifyBlockingQuery(ctx, newGetOptions(tEntry, r), correlationID, ch) + go c.notifyBlockingQuery(ctx, newGetOptions(tEntry, r), correlationID, cb) return nil } @@ -73,11 +94,11 @@ func (c *Cache) Notify( if info.MaxAge == 0 { return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge") } - go c.notifyPollingQuery(ctx, newGetOptions(tEntry, r), correlationID, ch) + go c.notifyPollingQuery(ctx, newGetOptions(tEntry, r), correlationID, cb) return nil } -func (c *Cache) notifyBlockingQuery(ctx context.Context, r getOptions, correlationID string, ch chan<- UpdateEvent) { +func (c *Cache) notifyBlockingQuery(ctx context.Context, r getOptions, correlationID string, cb Callback) { // Always start at 0 index to deliver the initial (possibly currently cached // value). index := uint64(0) @@ -101,12 +122,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, r getOptions, correlati // Check the index of the value returned in the cache entry to be sure it // changed if index == 0 || index < meta.Index { - u := UpdateEvent{correlationID, res, meta, err} - select { - case ch <- u: - case <-ctx.Done(): - return - } + cb(ctx, UpdateEvent{correlationID, res, meta, err}) // Update index for next request index = meta.Index @@ -143,7 +159,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, r getOptions, correlati } } -func (c *Cache) notifyPollingQuery(ctx context.Context, r getOptions, correlationID string, ch chan<- UpdateEvent) { +func (c *Cache) notifyPollingQuery(ctx context.Context, r getOptions, correlationID string, cb Callback) { index := uint64(0) failures := uint(0) @@ -166,12 +182,7 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, r getOptions, correlatio // Check for a change in the value or an index change if index < meta.Index || !reflect.DeepEqual(lastValue, res) { - u := UpdateEvent{correlationID, res, meta, err} - select { - case ch <- u: - case <-ctx.Done(): - return - } + cb(ctx, UpdateEvent{correlationID, res, meta, err}) // Update index and lastValue lastValue = res diff --git a/agent/proxycfg/connect_proxy.go b/agent/proxycfg/connect_proxy.go index 260e46dcd7..1564e19e1c 100644 --- a/agent/proxycfg/connect_proxy.go +++ b/agent/proxycfg/connect_proxy.go @@ -5,7 +5,6 @@ import ( "fmt" "strings" - "github.com/hashicorp/consul/agent/cache" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/configentry" "github.com/hashicorp/consul/agent/consul/discoverychain" @@ -220,7 +219,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e return snap, nil } -func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error { +func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, snap *ConfigSnapshot) error { if u.Err != nil { return fmt.Errorf("error filling agent cache: %v", u.Err) } diff --git a/agent/proxycfg/glue.go b/agent/proxycfg/glue.go new file mode 100644 index 0000000000..8a25da2f81 --- /dev/null +++ b/agent/proxycfg/glue.go @@ -0,0 +1,60 @@ +// TODO(agentless): these glue types belong in the agent package, but moving +// them is a little tricky because the proxycfg tests use them. It should be +// easier to break apart once we no longer depend on cache.Notify directly. +package proxycfg + +import ( + "context" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/rpcclient/health" + "github.com/hashicorp/consul/agent/structs" +) + +// HealthWrapper wraps health.Client so that the rest of the proxycfg package +// doesn't need to reference cache.UpdateEvent (it will be extracted into a +// shared library in the future). +type HealthWrapper struct { + Health *health.Client +} + +func (w *HealthWrapper) Notify( + ctx context.Context, + req structs.ServiceSpecificRequest, + correlationID string, + ch chan<- UpdateEvent, +) error { + return w.Health.Notify(ctx, req, correlationID, dispatchCacheUpdate(ctx, ch)) +} + +// CacheWrapper wraps cache.Cache so that the rest of the proxycfg package +// doesn't need to reference cache.UpdateEvent (it will be extracted into a +// shared library in the future). +type CacheWrapper struct { + Cache *cache.Cache +} + +func (w *CacheWrapper) Notify( + ctx context.Context, + t string, + req cache.Request, + correlationID string, + ch chan<- UpdateEvent, +) error { + return w.Cache.NotifyCallback(ctx, t, req, correlationID, dispatchCacheUpdate(ctx, ch)) +} + +func dispatchCacheUpdate(ctx context.Context, ch chan<- UpdateEvent) cache.Callback { + return func(ctx context.Context, e cache.UpdateEvent) { + u := UpdateEvent{ + CorrelationID: e.CorrelationID, + Result: e.Result, + Err: e.Err, + } + + select { + case ch <- u: + case <-ctx.Done(): + } + } +} diff --git a/agent/proxycfg/ingress_gateway.go b/agent/proxycfg/ingress_gateway.go index 311b5ca85b..5ae2998663 100644 --- a/agent/proxycfg/ingress_gateway.go +++ b/agent/proxycfg/ingress_gateway.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - "github.com/hashicorp/consul/agent/cache" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/structs" ) @@ -70,7 +69,7 @@ func (s *handlerIngressGateway) initialize(ctx context.Context) (ConfigSnapshot, return snap, nil } -func (s *handlerIngressGateway) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error { +func (s *handlerIngressGateway) handleUpdate(ctx context.Context, u UpdateEvent, snap *ConfigSnapshot) error { if u.Err != nil { return fmt.Errorf("error filling agent cache: %v", u.Err) } diff --git a/agent/proxycfg/manager.go b/agent/proxycfg/manager.go index d5d102b1ef..988d7f876f 100644 --- a/agent/proxycfg/manager.go +++ b/agent/proxycfg/manager.go @@ -6,7 +6,6 @@ import ( "github.com/hashicorp/go-hclog" - "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" @@ -59,7 +58,7 @@ type Manager struct { type ManagerConfig struct { // Cache is the agent's cache instance that can be used to retrieve, store and // monitor state for the proxies. - Cache *cache.Cache + Cache CacheNotifier // Health provides service health updates on a notification channel. Health Health // state is the agent's local state to be watched for new proxy registrations. diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index 9b62897033..82762fa596 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -376,8 +376,8 @@ func testManager_BasicLifecycle( // Create manager m, err := NewManager(ManagerConfig{ - Cache: c, - Health: &health.Client{Cache: c, CacheName: cachetype.HealthServicesName}, + Cache: &CacheWrapper{c}, + Health: &HealthWrapper{&health.Client{Cache: c, CacheName: cachetype.HealthServicesName}}, State: state, Source: source, Logger: logger, @@ -509,7 +509,7 @@ func TestManager_deliverLatest(t *testing.T) { // None of these need to do anything to test this method just be valid logger := testutil.Logger(t) cfg := ManagerConfig{ - Cache: cache.New(cache.Options{EntryFetchRate: rate.Inf, EntryFetchMaxBurst: 2}), + Cache: &CacheWrapper{cache.New(cache.Options{EntryFetchRate: rate.Inf, EntryFetchMaxBurst: 2})}, State: local.NewState(local.Config{}, logger, &token.Store{}), Source: &structs.QuerySource{ Node: "node1", @@ -581,8 +581,8 @@ func TestManager_SyncState_DefaultToken(t *testing.T) { state.TriggerSyncChanges = func() {} m, err := NewManager(ManagerConfig{ - Cache: c, - Health: &health.Client{Cache: c, CacheName: cachetype.HealthServicesName}, + Cache: &CacheWrapper{c}, + Health: &HealthWrapper{&health.Client{Cache: c, CacheName: cachetype.HealthServicesName}}, State: state, Tokens: tokens, Source: &structs.QuerySource{Datacenter: "dc1"}, @@ -626,8 +626,8 @@ func TestManager_SyncState_No_Notify(t *testing.T) { state.TriggerSyncChanges = func() {} m, err := NewManager(ManagerConfig{ - Cache: c, - Health: &health.Client{Cache: c, CacheName: cachetype.HealthServicesName}, + Cache: &CacheWrapper{c}, + Health: &HealthWrapper{&health.Client{Cache: c, CacheName: cachetype.HealthServicesName}}, State: state, Tokens: tokens, Source: &structs.QuerySource{Datacenter: "dc1"}, @@ -673,7 +673,7 @@ func TestManager_SyncState_No_Notify(t *testing.T) { // update the leaf certs roots, issuedCert := TestCerts(t) - notifyCH <- cache.UpdateEvent{ + notifyCH <- UpdateEvent{ CorrelationID: leafWatchID, Result: issuedCert, Err: nil, @@ -688,7 +688,7 @@ func TestManager_SyncState_No_Notify(t *testing.T) { } // update the root certs - notifyCH <- cache.UpdateEvent{ + notifyCH <- UpdateEvent{ CorrelationID: rootsWatchID, Result: roots, Err: nil, @@ -704,7 +704,7 @@ func TestManager_SyncState_No_Notify(t *testing.T) { } // update the mesh config entry - notifyCH <- cache.UpdateEvent{ + notifyCH <- UpdateEvent{ CorrelationID: meshConfigEntryID, Result: &structs.ConfigEntryResponse{}, Err: nil, @@ -723,7 +723,7 @@ func TestManager_SyncState_No_Notify(t *testing.T) { readEvent <- true // update the intentions - notifyCH <- cache.UpdateEvent{ + notifyCH <- UpdateEvent{ CorrelationID: intentionsWatchID, Result: &structs.IndexedIntentionMatches{}, Err: nil, @@ -741,7 +741,7 @@ func TestManager_SyncState_No_Notify(t *testing.T) { // send two snapshots back to back without reading them to overflow the snapshot channel and get to the default use case for i := 0; i < 2; i++ { time.Sleep(250 * time.Millisecond) - notifyCH <- cache.UpdateEvent{ + notifyCH <- UpdateEvent{ CorrelationID: leafWatchID, Result: issuedCert, Err: nil, diff --git a/agent/proxycfg/mesh_gateway.go b/agent/proxycfg/mesh_gateway.go index 5e5fa131a9..fb212a2f40 100644 --- a/agent/proxycfg/mesh_gateway.go +++ b/agent/proxycfg/mesh_gateway.go @@ -6,7 +6,6 @@ import ( "strings" "time" - "github.com/hashicorp/consul/agent/cache" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/logging" @@ -119,7 +118,7 @@ func (s *handlerMeshGateway) initializeCrossDCWatches(ctx context.Context) error return nil } -func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error { +func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, snap *ConfigSnapshot) error { if u.Err != nil { return fmt.Errorf("error filling agent cache: %v", u.Err) } diff --git a/agent/proxycfg/mesh_gateway_oss.go b/agent/proxycfg/mesh_gateway_oss.go index b32884452e..143eff76fe 100644 --- a/agent/proxycfg/mesh_gateway_oss.go +++ b/agent/proxycfg/mesh_gateway_oss.go @@ -7,14 +7,12 @@ import ( "context" "github.com/hashicorp/go-hclog" - - "github.com/hashicorp/consul/agent/cache" ) func (s *handlerMeshGateway) initializeEntWatches(_ context.Context) error { return nil } -func (s *handlerMeshGateway) handleEntUpdate(_ hclog.Logger, _ context.Context, _ cache.UpdateEvent, _ *ConfigSnapshot) error { +func (s *handlerMeshGateway) handleEntUpdate(_ hclog.Logger, _ context.Context, _ UpdateEvent, _ *ConfigSnapshot) error { return nil } diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index eed26d2f90..d5b1a7cf3e 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -17,13 +17,21 @@ import ( "github.com/hashicorp/consul/logging" ) +// UpdateEvent contains new data for a resource we are subscribed to (e.g. an +// agent cache entry). +type UpdateEvent struct { + CorrelationID string + Result interface{} + Err error +} + type CacheNotifier interface { Notify(ctx context.Context, t string, r cache.Request, - correlationID string, ch chan<- cache.UpdateEvent) error + correlationID string, ch chan<- UpdateEvent) error } type Health interface { - Notify(ctx context.Context, req structs.ServiceSpecificRequest, correlationID string, ch chan<- cache.UpdateEvent) error + Notify(ctx context.Context, req structs.ServiceSpecificRequest, correlationID string, ch chan<- UpdateEvent) error } const ( @@ -72,7 +80,7 @@ type state struct { // in Watch. cancel func() - ch chan cache.UpdateEvent + ch chan UpdateEvent snapCh chan ConfigSnapshot reqCh chan chan *ConfigSnapshot } @@ -153,7 +161,7 @@ func newState(ns *structs.NodeService, token string, config stateConfig) (*state // conservative to handle larger numbers of upstreams correctly but gives // some head room for normal operation to be non-blocking in most typical // cases. - ch := make(chan cache.UpdateEvent, 10) + ch := make(chan UpdateEvent, 10) s, err := newServiceInstanceFromNodeService(ns, token) if err != nil { @@ -175,7 +183,7 @@ func newState(ns *structs.NodeService, token string, config stateConfig) (*state }, nil } -func newKindHandler(config stateConfig, s serviceInstance, ch chan cache.UpdateEvent) (kindHandler, error) { +func newKindHandler(config stateConfig, s serviceInstance, ch chan UpdateEvent) (kindHandler, error) { var handler kindHandler h := handlerState{stateConfig: config, serviceInstance: s, ch: ch} @@ -228,7 +236,7 @@ func newServiceInstanceFromNodeService(ns *structs.NodeService, token string) (s type kindHandler interface { initialize(ctx context.Context) (ConfigSnapshot, error) - handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error + handleUpdate(ctx context.Context, u UpdateEvent, snap *ConfigSnapshot) error } // Watch initialized watches on all necessary cache data for the current proxy @@ -261,7 +269,7 @@ func (s *state) Close() error { type handlerState struct { stateConfig // TODO: un-embed serviceInstance // TODO: un-embed - ch chan cache.UpdateEvent + ch chan UpdateEvent } func newConfigSnapshotFromServiceInstance(s serviceInstance, config stateConfig) ConfigSnapshot { @@ -450,7 +458,7 @@ func hostnameEndpoints(logger hclog.Logger, localKey GatewayKey, nodes structs.C type gatewayWatchOpts struct { notifier CacheNotifier - notifyCh chan cache.UpdateEvent + notifyCh chan UpdateEvent source structs.QuerySource token string key GatewayKey diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index 29a176e0c8..a0298bb6b0 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -126,7 +126,7 @@ func TestStateChanged(t *testing.T) { type testCacheNotifierRequest struct { cacheType string request cache.Request - ch chan<- cache.UpdateEvent + cb func(UpdateEvent) } type testCacheNotifier struct { @@ -140,9 +140,23 @@ func newTestCacheNotifier() *testCacheNotifier { } } -func (cn *testCacheNotifier) Notify(ctx context.Context, t string, r cache.Request, correlationId string, ch chan<- cache.UpdateEvent) error { +func (cn *testCacheNotifier) Notify(ctx context.Context, t string, r cache.Request, correlationId string, ch chan<- UpdateEvent) error { cn.lock.Lock() - cn.notifiers[correlationId] = testCacheNotifierRequest{t, r, ch} + cn.notifiers[correlationId] = testCacheNotifierRequest{t, r, func(event UpdateEvent) { ch <- event }} + cn.lock.Unlock() + return nil +} + +// NotifyCallback satisfies the health.CacheGetter interface. +func (cn *testCacheNotifier) NotifyCallback(ctx context.Context, t string, r cache.Request, correlationId string, cb cache.Callback) error { + cn.lock.Lock() + cn.notifiers[correlationId] = testCacheNotifierRequest{t, r, func(event UpdateEvent) { + cb(ctx, cache.UpdateEvent{ + CorrelationID: event.CorrelationID, + Result: event.Result, + Err: event.Err, + }) + }} cn.lock.Unlock() return nil } @@ -159,20 +173,16 @@ func (cn *testCacheNotifier) getNotifierRequest(t testing.TB, correlationId stri return req } -func (cn *testCacheNotifier) getChanForCorrelationId(t testing.TB, correlationId string) chan<- cache.UpdateEvent { +func (cn *testCacheNotifier) sendNotification(t testing.TB, correlationId string, event UpdateEvent) { req := cn.getNotifierRequest(t, correlationId) - require.NotNil(t, req.ch) - return req.ch -} - -func (cn *testCacheNotifier) sendNotification(t testing.TB, correlationId string, event cache.UpdateEvent) { - cn.getChanForCorrelationId(t, correlationId) <- event + require.NotNil(t, req.cb) + req.cb(event) } func (cn *testCacheNotifier) verifyWatch(t testing.TB, correlationId string) (string, cache.Request) { // t.Logf("Watches: %+v", cn.notifiers) req := cn.getNotifierRequest(t, correlationId) - require.NotNil(t, req.ch) + require.NotNil(t, req.cb) return req.cacheType, req.request } @@ -348,7 +358,7 @@ func genVerifyConfigEntryWatch(expectedKind, expectedName, expectedDatacenter st } } -func ingressConfigWatchEvent(gwTLS bool, mixedTLS bool) cache.UpdateEvent { +func ingressConfigWatchEvent(gwTLS bool, mixedTLS bool) UpdateEvent { e := &structs.IngressGatewayConfigEntry{ TLS: structs.GatewayTLSConfig{ Enabled: gwTLS, @@ -371,7 +381,7 @@ func ingressConfigWatchEvent(gwTLS bool, mixedTLS bool) cache.UpdateEvent { } } - return cache.UpdateEvent{ + return UpdateEvent{ CorrelationID: gatewayConfigWatchID, Result: &structs.ConfigEntryResponse{ Entry: e, @@ -420,8 +430,8 @@ func TestState_WatchesAndUpdates(t *testing.T) { // TODO(peering): NewUpstreamIDFromServiceName should take a PeerName extApiUID.Peer = "peer-a" - rootWatchEvent := func() cache.UpdateEvent { - return cache.UpdateEvent{ + rootWatchEvent := func() UpdateEvent { + return UpdateEvent{ CorrelationID: rootsWatchID, Result: indexedRoots, Err: nil, @@ -430,7 +440,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { type verificationStage struct { requiredWatches map[string]verifyWatchRequest - events []cache.UpdateEvent + events []UpdateEvent verifySnapshot func(t testing.TB, snap *ConfigSnapshot) } @@ -562,7 +572,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }), }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ rootWatchEvent(), { CorrelationID: leafWatchID, @@ -752,7 +762,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }, { - events: []cache.UpdateEvent{ + events: []UpdateEvent{ rootWatchEvent(), }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { @@ -768,7 +778,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }, { - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: serviceListWatchID, Result: &structs.IndexedServiceList{ @@ -807,7 +817,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { serviceListWatchID: genVerifyListServicesWatch("dc1"), datacentersWatchID: verifyDatacentersWatch, }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ rootWatchEvent(), { CorrelationID: serviceListWatchID, @@ -826,7 +836,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }, { - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: serviceListWatchID, Result: &structs.IndexedServiceList{ @@ -845,7 +855,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }, { - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: "mesh-gateway:dc4", Result: &structs.IndexedNodesWithGateways{ @@ -889,7 +899,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }, { - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: federationStateListGatewaysWatchID, Result: &structs.DatacenterIndexedCheckServiceNodes{ @@ -958,7 +968,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }, { - events: []cache.UpdateEvent{ + events: []UpdateEvent{ rootWatchEvent(), { CorrelationID: meshConfigEntryID, @@ -971,7 +981,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }, { - events: []cache.UpdateEvent{ + events: []UpdateEvent{ ingressConfigWatchEvent(false, false), }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { @@ -981,7 +991,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }, { - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: gatewayServicesWatchID, Result: &structs.IndexedGatewayServices{ @@ -1022,7 +1032,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { requiredWatches: map[string]verifyWatchRequest{ leafWatchID: genVerifyLeafWatch("ingress-gateway", "dc1"), }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: leafWatchID, Result: issuedCert, @@ -1044,7 +1054,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Datacenter: "dc1", }), }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: "discovery-chain:" + apiUID.String(), Result: &structs.DiscoveryChainResponse{ @@ -1062,7 +1072,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { requiredWatches: map[string]verifyWatchRequest{ "upstream-target:api.default.default.dc1:" + apiUID.String(): genVerifyServiceWatch("api", "", "dc1", true), }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: "upstream-target:api.default.default.dc1:" + apiUID.String(), Result: &structs.IndexedCheckServiceNodes{ @@ -1121,7 +1131,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { gatewayConfigWatchID: genVerifyConfigEntryWatch(structs.IngressGateway, "ingress-gateway", "dc1"), gatewayServicesWatchID: genVerifyGatewayServiceWatch("ingress-gateway", "dc1"), }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ rootWatchEvent(), { CorrelationID: meshConfigEntryID, @@ -1169,7 +1179,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { "*.ingress.dc1.alt.consul.", }), }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: gatewayServicesWatchID, Result: &structs.IndexedGatewayServices{}, @@ -1201,7 +1211,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { gatewayConfigWatchID: genVerifyConfigEntryWatch(structs.IngressGateway, "ingress-gateway", "dc1"), gatewayServicesWatchID: genVerifyGatewayServiceWatch("ingress-gateway", "dc1"), }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ rootWatchEvent(), { CorrelationID: meshConfigEntryID, @@ -1262,7 +1272,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { "*.ingress.dc1.alt.consul.", }), }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: gatewayServicesWatchID, Result: &structs.IndexedGatewayServices{}, @@ -1302,7 +1312,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }, { - events: []cache.UpdateEvent{ + events: []UpdateEvent{ rootWatchEvent(), { CorrelationID: meshConfigEntryID, @@ -1337,7 +1347,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { gatewayServicesWatchID: genVerifyServiceSpecificRequest(gatewayServicesWatchID, "terminating-gateway", "", "dc1", false), }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ rootWatchEvent(), { CorrelationID: meshConfigEntryID, @@ -1365,7 +1375,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }, { - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: gatewayServicesWatchID, Result: &structs.IndexedGatewayServices{ @@ -1426,7 +1436,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { requiredWatches: map[string]verifyWatchRequest{ "external-service:" + db.String(): genVerifyServiceWatch("db", "", "dc1", false), }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: "external-service:" + db.String(), Result: &structs.IndexedCheckServiceNodes{ @@ -1471,7 +1481,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { requiredWatches: map[string]verifyWatchRequest{ "external-service:" + api.String(): genVerifyServiceWatch("api", "", "dc1", false), }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: "external-service:" + api.String(), Result: &structs.IndexedCheckServiceNodes{ @@ -1564,7 +1574,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { requiredWatches: map[string]verifyWatchRequest{ "service-leaf:" + db.String(): genVerifyLeafWatch("db", "dc1"), }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: "service-leaf:" + db.String(), Result: issuedCert, @@ -1582,7 +1592,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { requiredWatches: map[string]verifyWatchRequest{ serviceIntentionsIDPrefix + db.String(): genVerifyIntentionWatch("db", "dc1"), }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: serviceIntentionsIDPrefix + db.String(), Result: dbIxnMatch, @@ -1603,7 +1613,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { requiredWatches: map[string]verifyWatchRequest{ serviceConfigIDPrefix + db.String(): genVerifyResolvedConfigWatch("db", "dc1"), }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: serviceConfigIDPrefix + db.String(), Result: dbConfig, @@ -1622,7 +1632,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { requiredWatches: map[string]verifyWatchRequest{ "service-resolver:" + db.String(): genVerifyResolverWatch("db", "dc1", structs.ServiceResolver), }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: "service-resolver:" + db.String(), Result: dbResolver, @@ -1642,7 +1652,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }, { - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: gatewayServicesWatchID, Result: &structs.IndexedGatewayServices{ @@ -1730,7 +1740,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }, { - events: []cache.UpdateEvent{ + events: []UpdateEvent{ rootWatchEvent(), { CorrelationID: leafWatchID, @@ -1814,7 +1824,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, // Valid snapshot after roots, leaf, and intentions { - events: []cache.UpdateEvent{ + events: []UpdateEvent{ rootWatchEvent(), { CorrelationID: leafWatchID, @@ -1857,7 +1867,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { leafWatchID: genVerifyLeafWatch("api", "dc1"), intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: intentionUpstreamsID, Result: &structs.IndexedServiceList{ @@ -1900,7 +1910,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote}, }), }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: "discovery-chain:" + dbUID.String(), Result: &structs.DiscoveryChainResponse{ @@ -1918,7 +1928,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { requiredWatches: map[string]verifyWatchRequest{ "upstream-target:db.default.default.dc1:" + dbUID.String(): genVerifyServiceWatch("db", "", "dc1", true), }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(), Result: &structs.IndexedCheckServiceNodes{ @@ -2069,7 +2079,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote}, }), }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: "discovery-chain:" + dbUID.String(), Result: &structs.DiscoveryChainResponse{ @@ -2096,7 +2106,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, { // Receive a new upstream target event without proxy1. - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(), Result: &structs.IndexedCheckServiceNodes{ @@ -2177,7 +2187,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, { // Receive a new upstream target event with a conflicting passthrough address - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: "upstream-target:api.default.default.dc1:" + apiUID.String(), Result: &structs.IndexedCheckServiceNodes{ @@ -2259,7 +2269,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, { // Event with no nodes should clean up addrs - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: "upstream-target:api.default.default.dc1:" + apiUID.String(), Result: &structs.IndexedCheckServiceNodes{ @@ -2289,7 +2299,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { leafWatchID: genVerifyLeafWatch("api", "dc1"), intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: intentionUpstreamsID, Result: &structs.IndexedServiceList{ @@ -2382,7 +2392,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, // Valid snapshot after roots, leaf, and intentions { - events: []cache.UpdateEvent{ + events: []UpdateEvent{ rootWatchEvent(), { CorrelationID: leafWatchID, @@ -2428,7 +2438,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal}, }), }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: "discovery-chain:" + upstreamIDForDC2(dbUID).String(), Result: &structs.DiscoveryChainResponse{ @@ -2465,7 +2475,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal}, }), }, - events: []cache.UpdateEvent{ + events: []UpdateEvent{ { CorrelationID: intentionUpstreamsID, Result: &structs.IndexedServiceList{ @@ -2557,7 +2567,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, { // This time add the events - events: []cache.UpdateEvent{ + events: []UpdateEvent{ rootWatchEvent(), { CorrelationID: leafWatchID, @@ -2607,7 +2617,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { state, err := newState(&tc.ns, "", stateConfig{ logger: testutil.Logger(t), cache: cn, - health: &health.Client{Cache: cn, CacheName: cachetype.HealthServicesName}, + health: &HealthWrapper{&health.Client{Cache: cn, CacheName: cachetype.HealthServicesName}}, source: &structs.QuerySource{ Datacenter: tc.sourceDC, }, diff --git a/agent/proxycfg/terminating_gateway.go b/agent/proxycfg/terminating_gateway.go index 592a454cac..b27dbe8c07 100644 --- a/agent/proxycfg/terminating_gateway.go +++ b/agent/proxycfg/terminating_gateway.go @@ -5,7 +5,6 @@ import ( "fmt" "strings" - "github.com/hashicorp/consul/agent/cache" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/structs" ) @@ -68,7 +67,7 @@ func (s *handlerTerminatingGateway) initialize(ctx context.Context) (ConfigSnaps return snap, nil } -func (s *handlerTerminatingGateway) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error { +func (s *handlerTerminatingGateway) handleUpdate(ctx context.Context, u UpdateEvent, snap *ConfigSnapshot) error { if u.Err != nil { return fmt.Errorf("error filling agent cache: %v", u.Err) } diff --git a/agent/proxycfg/testing.go b/agent/proxycfg/testing.go index eb2ebfb0b2..ab311403e7 100644 --- a/agent/proxycfg/testing.go +++ b/agent/proxycfg/testing.go @@ -672,7 +672,7 @@ type noopCacheNotifier struct{} var _ CacheNotifier = (*noopCacheNotifier)(nil) -func (*noopCacheNotifier) Notify(_ context.Context, _ string, _ cache.Request, _ string, _ chan<- cache.UpdateEvent) error { +func (*noopCacheNotifier) Notify(_ context.Context, _ string, _ cache.Request, _ string, _ chan<- UpdateEvent) error { return nil } @@ -680,7 +680,7 @@ type noopHealth struct{} var _ Health = (*noopHealth)(nil) -func (*noopHealth) Notify(_ context.Context, _ structs.ServiceSpecificRequest, _ string, _ chan<- cache.UpdateEvent) error { +func (*noopHealth) Notify(_ context.Context, _ structs.ServiceSpecificRequest, _ string, _ chan<- UpdateEvent) error { return nil } @@ -698,7 +698,7 @@ func testConfigSnapshotFixture( ns *structs.NodeService, nsFn func(ns *structs.NodeService), serverSNIFn ServerSNIFunc, - updates []cache.UpdateEvent, + updates []UpdateEvent, ) *ConfigSnapshot { const token = "" @@ -750,15 +750,15 @@ func testConfigSnapshotFixture( return &snap } -func testSpliceEvents(base, extra []cache.UpdateEvent) []cache.UpdateEvent { +func testSpliceEvents(base, extra []UpdateEvent) []UpdateEvent { if len(extra) == 0 { return base } var ( - hasExtra = make(map[string]cache.UpdateEvent) + hasExtra = make(map[string]UpdateEvent) completeExtra = make(map[string]struct{}) - allEvents []cache.UpdateEvent + allEvents []UpdateEvent ) for _, e := range extra { diff --git a/agent/proxycfg/testing_connect_proxy.go b/agent/proxycfg/testing_connect_proxy.go index 61db728c2f..2e34e7d352 100644 --- a/agent/proxycfg/testing_connect_proxy.go +++ b/agent/proxycfg/testing_connect_proxy.go @@ -4,14 +4,13 @@ import ( "github.com/mitchellh/go-testing-interface" "github.com/stretchr/testify/assert" - "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/structs" ) // TestConfigSnapshot returns a fully populated snapshot -func TestConfigSnapshot(t testing.T, nsFn func(ns *structs.NodeService), extraUpdates []cache.UpdateEvent) *ConfigSnapshot { +func TestConfigSnapshot(t testing.T, nsFn func(ns *structs.NodeService), extraUpdates []UpdateEvent) *ConfigSnapshot { roots, leaf := TestCerts(t) // no entries implies we'll get a default chain @@ -29,7 +28,7 @@ func TestConfigSnapshot(t testing.T, nsFn func(ns *structs.NodeService), extraUp webSN = structs.ServiceIDString("web", nil) ) - baseEvents := []cache.UpdateEvent{ + baseEvents := []UpdateEvent{ { CorrelationID: rootsWatchID, Result: roots, @@ -94,7 +93,7 @@ func TestConfigSnapshotDiscoveryChain( t testing.T, variation string, nsFn func(ns *structs.NodeService), - extraUpdates []cache.UpdateEvent, + extraUpdates []UpdateEvent, additionalEntries ...structs.ConfigEntry, ) *ConfigSnapshot { roots, leaf := TestCerts(t) @@ -108,7 +107,7 @@ func TestConfigSnapshotDiscoveryChain( webSN = structs.ServiceIDString("web", nil) ) - baseEvents := testSpliceEvents([]cache.UpdateEvent{ + baseEvents := testSpliceEvents([]UpdateEvent{ { CorrelationID: rootsWatchID, Result: roots, @@ -171,7 +170,7 @@ func TestConfigSnapshotExposeConfig(t testing.T, nsFn func(ns *structs.NodeServi webSN = structs.ServiceIDString("web", nil) ) - baseEvents := []cache.UpdateEvent{ + baseEvents := []UpdateEvent{ { CorrelationID: rootsWatchID, Result: roots, @@ -252,7 +251,7 @@ func TestConfigSnapshotGRPCExposeHTTP1(t testing.T) *ConfigSnapshot { }, Meta: nil, TaggedAddresses: nil, - }, nil, nil, []cache.UpdateEvent{ + }, nil, nil, []UpdateEvent{ { CorrelationID: rootsWatchID, Result: roots, diff --git a/agent/proxycfg/testing_ingress_gateway.go b/agent/proxycfg/testing_ingress_gateway.go index b0f09449df..3512ec0f92 100644 --- a/agent/proxycfg/testing_ingress_gateway.go +++ b/agent/proxycfg/testing_ingress_gateway.go @@ -7,7 +7,6 @@ import ( "github.com/stretchr/testify/require" "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/structs" @@ -21,7 +20,7 @@ func TestConfigSnapshotIngressGateway( variation string, nsFn func(ns *structs.NodeService), configFn func(entry *structs.IngressGatewayConfigEntry), - extraUpdates []cache.UpdateEvent, + extraUpdates []UpdateEvent, additionalEntries ...structs.ConfigEntry, ) *ConfigSnapshot { roots, placeholderLeaf := TestCerts(t) @@ -47,7 +46,7 @@ func TestConfigSnapshotIngressGateway( configFn(entry) } - baseEvents := []cache.UpdateEvent{ + baseEvents := []UpdateEvent{ { CorrelationID: rootsWatchID, Result: roots, @@ -71,7 +70,7 @@ func TestConfigSnapshotIngressGateway( } if populateServices { - baseEvents = testSpliceEvents(baseEvents, []cache.UpdateEvent{{ + baseEvents = testSpliceEvents(baseEvents, []UpdateEvent{{ CorrelationID: gatewayServicesWatchID, Result: &structs.IndexedGatewayServices{ Services: []*structs.GatewayService{ @@ -155,7 +154,7 @@ func TestConfigSnapshotIngressGatewaySDS_GatewayLevel_MixedTLS(t testing.T) *Con }, }, } - }, []cache.UpdateEvent{ + }, []UpdateEvent{ { CorrelationID: gatewayServicesWatchID, Result: &structs.IndexedGatewayServices{ @@ -270,7 +269,7 @@ func TestConfigSnapshotIngressGatewaySDS_GatewayAndListenerLevel_HTTP(t testing. }, }, } - }, []cache.UpdateEvent{ + }, []UpdateEvent{ { CorrelationID: gatewayServicesWatchID, Result: &structs.IndexedGatewayServices{ @@ -344,7 +343,7 @@ func TestConfigSnapshotIngressGatewaySDS_ServiceLevel(t testing.T) *ConfigSnapsh }, }, } - }, []cache.UpdateEvent{ + }, []UpdateEvent{ { CorrelationID: gatewayServicesWatchID, Result: &structs.IndexedGatewayServices{ @@ -434,7 +433,7 @@ func TestConfigSnapshotIngressGatewaySDS_ListenerAndServiceLevel(t testing.T) *C }, }, } - }, []cache.UpdateEvent{ + }, []UpdateEvent{ { CorrelationID: gatewayServicesWatchID, Result: &structs.IndexedGatewayServices{ @@ -519,7 +518,7 @@ func TestConfigSnapshotIngressGatewaySDS_MixedNoTLS(t testing.T) *ConfigSnapshot }, }, } - }, []cache.UpdateEvent{ + }, []UpdateEvent{ { CorrelationID: gatewayServicesWatchID, Result: &structs.IndexedGatewayServices{ @@ -601,7 +600,7 @@ func TestConfigSnapshotIngressGateway_MixedListeners(t testing.T) *ConfigSnapsho }, }, } - }, []cache.UpdateEvent{ + }, []UpdateEvent{ { CorrelationID: gatewayServicesWatchID, Result: &structs.IndexedGatewayServices{ @@ -717,7 +716,7 @@ func TestConfigSnapshotIngress_HTTPMultipleServices(t testing.T) *ConfigSnapshot }, }, } - }, []cache.UpdateEvent{ + }, []UpdateEvent{ { CorrelationID: gatewayServicesWatchID, Result: &structs.IndexedGatewayServices{ @@ -830,7 +829,7 @@ func TestConfigSnapshotIngress_MultipleListenersDuplicateService(t testing.T) *C }, }, } - }, []cache.UpdateEvent{ + }, []UpdateEvent{ { CorrelationID: gatewayServicesWatchID, Result: &structs.IndexedGatewayServices{ @@ -893,7 +892,7 @@ func TestConfigSnapshotIngressGatewayWithChain( } var ( - updates []cache.UpdateEvent + updates []UpdateEvent configFn func(entry *structs.IngressGatewayConfigEntry) populateServices bool @@ -1088,7 +1087,7 @@ func TestConfigSnapshotIngressGatewayWithChain( fooEntMeta.PartitionOrDefault(), "dc1", connect.TestClusterID+".consul", nil, entries...) - updates = []cache.UpdateEvent{ + updates = []UpdateEvent{ { CorrelationID: gatewayServicesWatchID, Result: &structs.IndexedGatewayServices{ @@ -1218,7 +1217,7 @@ func TestConfigSnapshotIngressGateway_TLSMinVersionListenersGatewayDefaults(t te }, }, } - }, []cache.UpdateEvent{ + }, []UpdateEvent{ { CorrelationID: gatewayServicesWatchID, Result: &structs.IndexedGatewayServices{ @@ -1336,7 +1335,7 @@ func TestConfigSnapshotIngressGateway_SingleTLSListener(t testing.T) *ConfigSnap }, }, } - }, []cache.UpdateEvent{ + }, []UpdateEvent{ { CorrelationID: gatewayServicesWatchID, Result: &structs.IndexedGatewayServices{ @@ -1436,7 +1435,7 @@ func TestConfigSnapshotIngressGateway_TLSMixedMinVersionListeners(t testing.T) * }, }, } - }, []cache.UpdateEvent{ + }, []UpdateEvent{ { CorrelationID: gatewayServicesWatchID, Result: &structs.IndexedGatewayServices{ diff --git a/agent/proxycfg/testing_mesh_gateway.go b/agent/proxycfg/testing_mesh_gateway.go index c374a9aad2..c5baaea2f3 100644 --- a/agent/proxycfg/testing_mesh_gateway.go +++ b/agent/proxycfg/testing_mesh_gateway.go @@ -6,11 +6,10 @@ import ( "github.com/mitchellh/go-testing-interface" - "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/structs" ) -func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *structs.NodeService), extraUpdates []cache.UpdateEvent) *ConfigSnapshot { +func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *structs.NodeService), extraUpdates []UpdateEvent) *ConfigSnapshot { roots, _ := TestCerts(t) var ( @@ -38,7 +37,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st useFederationStates = false deleteCrossDCEntry = false case "service-subsets": - extraUpdates = append(extraUpdates, cache.UpdateEvent{ + extraUpdates = append(extraUpdates, UpdateEvent{ CorrelationID: serviceResolversWatchID, Result: &structs.IndexedConfigEntries{ Kind: structs.ServiceResolver, @@ -60,7 +59,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st }, }) case "service-subsets2": // TODO(rb): make this merge with 'service-subsets' - extraUpdates = append(extraUpdates, cache.UpdateEvent{ + extraUpdates = append(extraUpdates, UpdateEvent{ CorrelationID: serviceResolversWatchID, Result: &structs.IndexedConfigEntries{ Kind: structs.ServiceResolver, @@ -95,7 +94,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st }, }) case "default-service-subsets2": // TODO(rb): rename to strip the 2 when the prior is merged with 'service-subsets' - extraUpdates = append(extraUpdates, cache.UpdateEvent{ + extraUpdates = append(extraUpdates, UpdateEvent{ CorrelationID: serviceResolversWatchID, Result: &structs.IndexedConfigEntries{ Kind: structs.ServiceResolver, @@ -132,7 +131,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st }, }) case "ignore-extra-resolvers": - extraUpdates = append(extraUpdates, cache.UpdateEvent{ + extraUpdates = append(extraUpdates, UpdateEvent{ CorrelationID: serviceResolversWatchID, Result: &structs.IndexedConfigEntries{ Kind: structs.ServiceResolver, @@ -169,7 +168,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st }, }) case "service-timeouts": - extraUpdates = append(extraUpdates, cache.UpdateEvent{ + extraUpdates = append(extraUpdates, UpdateEvent{ CorrelationID: serviceResolversWatchID, Result: &structs.IndexedConfigEntries{ Kind: structs.ServiceResolver, @@ -192,7 +191,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st }, }) case "non-hash-lb-injected": - extraUpdates = append(extraUpdates, cache.UpdateEvent{ + extraUpdates = append(extraUpdates, UpdateEvent{ CorrelationID: "service-resolvers", // serviceResolversWatchID Result: &structs.IndexedConfigEntries{ Kind: structs.ServiceResolver, @@ -220,7 +219,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st }, }) case "hash-lb-ignored": - extraUpdates = append(extraUpdates, cache.UpdateEvent{ + extraUpdates = append(extraUpdates, UpdateEvent{ CorrelationID: "service-resolvers", // serviceResolversWatchID Result: &structs.IndexedConfigEntries{ Kind: structs.ServiceResolver, @@ -253,7 +252,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st return nil } - baseEvents := []cache.UpdateEvent{ + baseEvents := []UpdateEvent{ { CorrelationID: rootsWatchID, Result: roots, @@ -278,7 +277,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st } if populateServices || useFederationStates { - baseEvents = testSpliceEvents(baseEvents, []cache.UpdateEvent{ + baseEvents = testSpliceEvents(baseEvents, []UpdateEvent{ { CorrelationID: datacentersWatchID, Result: &[]string{"dc1", "dc2", "dc4", "dc6"}, @@ -291,7 +290,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st foo = structs.NewServiceName("foo", nil) bar = structs.NewServiceName("bar", nil) ) - baseEvents = testSpliceEvents(baseEvents, []cache.UpdateEvent{ + baseEvents = testSpliceEvents(baseEvents, []UpdateEvent{ { CorrelationID: "mesh-gateway:dc2", Result: &structs.IndexedNodesWithGateways{ @@ -349,7 +348,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st }) if deleteCrossDCEntry { - baseEvents = testSpliceEvents(baseEvents, []cache.UpdateEvent{ + baseEvents = testSpliceEvents(baseEvents, []UpdateEvent{ { // Have the cross-dc query mechanism not work for dc2 so // fedstates will infill. @@ -399,7 +398,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st } } - baseEvents = testSpliceEvents(baseEvents, []cache.UpdateEvent{ + baseEvents = testSpliceEvents(baseEvents, []UpdateEvent{ { CorrelationID: federationStateListGatewaysWatchID, Result: &structs.DatacenterIndexedCheckServiceNodes{ diff --git a/agent/proxycfg/testing_terminating_gateway.go b/agent/proxycfg/testing_terminating_gateway.go index 5907b01995..e7b8c7b459 100644 --- a/agent/proxycfg/testing_terminating_gateway.go +++ b/agent/proxycfg/testing_terminating_gateway.go @@ -3,8 +3,6 @@ package proxycfg import ( "github.com/mitchellh/go-testing-interface" - "github.com/hashicorp/consul/agent/cache" - agentcache "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/structs" ) @@ -12,7 +10,7 @@ func TestConfigSnapshotTerminatingGateway( t testing.T, populateServices bool, nsFn func(ns *structs.NodeService), - extraUpdates []agentcache.UpdateEvent, + extraUpdates []UpdateEvent, ) *ConfigSnapshot { roots, _ := TestCerts(t) @@ -23,7 +21,7 @@ func TestConfigSnapshotTerminatingGateway( cache = structs.NewServiceName("cache", nil) ) - baseEvents := []agentcache.UpdateEvent{ + baseEvents := []UpdateEvent{ { CorrelationID: rootsWatchID, Result: roots, @@ -158,7 +156,7 @@ func TestConfigSnapshotTerminatingGateway( }, } - baseEvents = testSpliceEvents(baseEvents, []agentcache.UpdateEvent{ + baseEvents = testSpliceEvents(baseEvents, []UpdateEvent{ { CorrelationID: gatewayServicesWatchID, Result: &structs.IndexedGatewayServices{ @@ -356,7 +354,7 @@ func testConfigSnapshotTerminatingGatewayServiceSubsets(t testing.T, alsoAdjustC cache = structs.NewServiceName("cache", nil) ) - events := []agentcache.UpdateEvent{ + events := []UpdateEvent{ { CorrelationID: serviceResolverIDPrefix + web.String(), Result: &structs.ConfigEntryResponse{ @@ -384,7 +382,7 @@ func testConfigSnapshotTerminatingGatewayServiceSubsets(t testing.T, alsoAdjustC } if alsoAdjustCache { - events = testSpliceEvents(events, []agentcache.UpdateEvent{ + events = testSpliceEvents(events, []UpdateEvent{ { CorrelationID: serviceResolverIDPrefix + cache.String(), Result: &structs.ConfigEntryResponse{ @@ -414,7 +412,7 @@ func testConfigSnapshotTerminatingGatewayServiceSubsets(t testing.T, alsoAdjustC func TestConfigSnapshotTerminatingGatewayDefaultServiceSubset(t testing.T) *ConfigSnapshot { web := structs.NewServiceName("web", nil) - return TestConfigSnapshotTerminatingGateway(t, true, nil, []agentcache.UpdateEvent{ + return TestConfigSnapshotTerminatingGateway(t, true, nil, []UpdateEvent{ { CorrelationID: serviceResolverIDPrefix + web.String(), Result: &structs.ConfigEntryResponse{ @@ -498,7 +496,7 @@ func testConfigSnapshotTerminatingGatewayLBConfig(t testing.T, variant string) * return nil } - return TestConfigSnapshotTerminatingGateway(t, true, nil, []cache.UpdateEvent{ + return TestConfigSnapshotTerminatingGateway(t, true, nil, []UpdateEvent{ { CorrelationID: serviceConfigIDPrefix + web.String(), Result: &structs.ServiceConfigResponse{ @@ -521,7 +519,7 @@ func testConfigSnapshotTerminatingGatewayLBConfig(t testing.T, variant string) * } func TestConfigSnapshotTerminatingGatewaySNI(t testing.T) *ConfigSnapshot { - return TestConfigSnapshotTerminatingGateway(t, true, nil, []cache.UpdateEvent{ + return TestConfigSnapshotTerminatingGateway(t, true, nil, []UpdateEvent{ { CorrelationID: "gateway-services", Result: &structs.IndexedGatewayServices{ @@ -550,7 +548,7 @@ func TestConfigSnapshotTerminatingGatewayHostnameSubsets(t testing.T) *ConfigSna cache = structs.NewServiceName("cache", nil) ) - return TestConfigSnapshotTerminatingGateway(t, true, nil, []agentcache.UpdateEvent{ + return TestConfigSnapshotTerminatingGateway(t, true, nil, []UpdateEvent{ { CorrelationID: serviceResolverIDPrefix + api.String(), Result: &structs.ConfigEntryResponse{ @@ -600,7 +598,7 @@ func TestConfigSnapshotTerminatingGatewayIgnoreExtraResolvers(t testing.T) *Conf notfound = structs.NewServiceName("notfound", nil) ) - return TestConfigSnapshotTerminatingGateway(t, true, nil, []agentcache.UpdateEvent{ + return TestConfigSnapshotTerminatingGateway(t, true, nil, []UpdateEvent{ { CorrelationID: serviceResolverIDPrefix + web.String(), Result: &structs.ConfigEntryResponse{ @@ -648,9 +646,9 @@ func TestConfigSnapshotTerminatingGatewayIgnoreExtraResolvers(t testing.T) *Conf }) } -func TestConfigSnapshotTerminatingGatewayWithLambdaService(t testing.T, extraUpdateEvents ...agentcache.UpdateEvent) *ConfigSnapshot { +func TestConfigSnapshotTerminatingGatewayWithLambdaService(t testing.T, extraUpdateEvents ...UpdateEvent) *ConfigSnapshot { web := structs.NewServiceName("web", nil) - updateEvents := append(extraUpdateEvents, agentcache.UpdateEvent{ + updateEvents := append(extraUpdateEvents, UpdateEvent{ CorrelationID: serviceConfigIDPrefix + web.String(), Result: &structs.ServiceConfigResponse{ ProxyConfig: map[string]interface{}{"protocol": "http"}, @@ -669,7 +667,7 @@ func TestConfigSnapshotTerminatingGatewayWithLambdaServiceAndServiceResolvers(t web := structs.NewServiceName("web", nil) return TestConfigSnapshotTerminatingGatewayWithLambdaService(t, - agentcache.UpdateEvent{ + UpdateEvent{ CorrelationID: serviceResolverIDPrefix + web.String(), Result: &structs.ConfigEntryResponse{ Entry: &structs.ServiceResolverConfigEntry{ diff --git a/agent/proxycfg/testing_tproxy.go b/agent/proxycfg/testing_tproxy.go index e593a51e93..b93e6c970b 100644 --- a/agent/proxycfg/testing_tproxy.go +++ b/agent/proxycfg/testing_tproxy.go @@ -5,7 +5,6 @@ import ( "github.com/mitchellh/go-testing-interface" - "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/structs" @@ -28,7 +27,7 @@ func TestConfigSnapshotTransparentProxy(t testing.T) *ConfigSnapshot { return TestConfigSnapshot(t, func(ns *structs.NodeService) { ns.Proxy.Mode = structs.ProxyModeTransparent - }, []cache.UpdateEvent{ + }, []UpdateEvent{ { CorrelationID: meshConfigEntryID, Result: &structs.ConfigEntryResponse{ @@ -141,7 +140,7 @@ func TestConfigSnapshotTransparentProxyHTTPUpstream(t testing.T) *ConfigSnapshot return TestConfigSnapshot(t, func(ns *structs.NodeService) { ns.Proxy.Mode = structs.ProxyModeTransparent - }, []cache.UpdateEvent{ + }, []UpdateEvent{ { CorrelationID: meshConfigEntryID, Result: &structs.ConfigEntryResponse{ @@ -245,7 +244,7 @@ func TestConfigSnapshotTransparentProxyCatalogDestinationsOnly(t testing.T) *Con return TestConfigSnapshot(t, func(ns *structs.NodeService) { ns.Proxy.Mode = structs.ProxyModeTransparent - }, []cache.UpdateEvent{ + }, []UpdateEvent{ { CorrelationID: meshConfigEntryID, Result: &structs.ConfigEntryResponse{ @@ -335,7 +334,7 @@ func TestConfigSnapshotTransparentProxyDialDirectly(t testing.T) *ConfigSnapshot return TestConfigSnapshot(t, func(ns *structs.NodeService) { ns.Proxy.Mode = structs.ProxyModeTransparent - }, []cache.UpdateEvent{ + }, []UpdateEvent{ { CorrelationID: meshConfigEntryID, Result: &structs.ConfigEntryResponse{ @@ -473,7 +472,7 @@ func TestConfigSnapshotTransparentProxyTerminatingGatewayCatalogDestinationsOnly return TestConfigSnapshot(t, func(ns *structs.NodeService) { ns.Proxy.Mode = structs.ProxyModeTransparent - }, []cache.UpdateEvent{ + }, []UpdateEvent{ { CorrelationID: meshConfigEntryID, Result: &structs.ConfigEntryResponse{ diff --git a/agent/proxycfg/testing_upstreams.go b/agent/proxycfg/testing_upstreams.go index c97ac7a4ff..2d80c0968d 100644 --- a/agent/proxycfg/testing_upstreams.go +++ b/agent/proxycfg/testing_upstreams.go @@ -5,7 +5,6 @@ import ( "github.com/mitchellh/go-testing-interface" - "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/structs" @@ -16,7 +15,7 @@ func setupTestVariationConfigEntriesAndSnapshot( variation string, upstreams structs.Upstreams, additionalEntries ...structs.ConfigEntry, -) []cache.UpdateEvent { +) []UpdateEvent { var ( dbUpstream = upstreams[0] @@ -25,7 +24,7 @@ func setupTestVariationConfigEntriesAndSnapshot( dbChain := setupTestVariationDiscoveryChain(t, variation, additionalEntries...) - events := []cache.UpdateEvent{ + events := []UpdateEvent{ { CorrelationID: "discovery-chain:" + dbUID.String(), Result: &structs.DiscoveryChainResponse{ @@ -46,14 +45,14 @@ func setupTestVariationConfigEntriesAndSnapshot( case "simple": case "external-sni": case "failover": - events = append(events, cache.UpdateEvent{ + events = append(events, UpdateEvent{ CorrelationID: "upstream-target:fail.default.default.dc1:" + dbUID.String(), Result: &structs.IndexedCheckServiceNodes{ Nodes: TestUpstreamNodesAlternate(t), }, }) case "failover-through-remote-gateway-triggered": - events = append(events, cache.UpdateEvent{ + events = append(events, UpdateEvent{ CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(), Result: &structs.IndexedCheckServiceNodes{ Nodes: TestUpstreamNodesInStatus(t, "critical"), @@ -61,26 +60,26 @@ func setupTestVariationConfigEntriesAndSnapshot( }) fallthrough case "failover-through-remote-gateway": - events = append(events, cache.UpdateEvent{ + events = append(events, UpdateEvent{ CorrelationID: "upstream-target:db.default.default.dc2:" + dbUID.String(), Result: &structs.IndexedCheckServiceNodes{ Nodes: TestUpstreamNodesDC2(t), }, }) - events = append(events, cache.UpdateEvent{ + events = append(events, UpdateEvent{ CorrelationID: "mesh-gateway:dc2:" + dbUID.String(), Result: &structs.IndexedNodesWithGateways{ Nodes: TestGatewayNodesDC2(t), }, }) case "failover-through-double-remote-gateway-triggered": - events = append(events, cache.UpdateEvent{ + events = append(events, UpdateEvent{ CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(), Result: &structs.IndexedCheckServiceNodes{ Nodes: TestUpstreamNodesInStatus(t, "critical"), }, }) - events = append(events, cache.UpdateEvent{ + events = append(events, UpdateEvent{ CorrelationID: "upstream-target:db.default.default.dc2:" + dbUID.String(), Result: &structs.IndexedCheckServiceNodes{ Nodes: TestUpstreamNodesInStatusDC2(t, "critical"), @@ -88,26 +87,26 @@ func setupTestVariationConfigEntriesAndSnapshot( }) fallthrough case "failover-through-double-remote-gateway": - events = append(events, cache.UpdateEvent{ + events = append(events, UpdateEvent{ CorrelationID: "upstream-target:db.default.default.dc3:" + dbUID.String(), Result: &structs.IndexedCheckServiceNodes{ Nodes: TestUpstreamNodesDC2(t), }, }) - events = append(events, cache.UpdateEvent{ + events = append(events, UpdateEvent{ CorrelationID: "mesh-gateway:dc2:" + dbUID.String(), Result: &structs.IndexedNodesWithGateways{ Nodes: TestGatewayNodesDC2(t), }, }) - events = append(events, cache.UpdateEvent{ + events = append(events, UpdateEvent{ CorrelationID: "mesh-gateway:dc3:" + dbUID.String(), Result: &structs.IndexedNodesWithGateways{ Nodes: TestGatewayNodesDC3(t), }, }) case "failover-through-local-gateway-triggered": - events = append(events, cache.UpdateEvent{ + events = append(events, UpdateEvent{ CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(), Result: &structs.IndexedCheckServiceNodes{ Nodes: TestUpstreamNodesInStatus(t, "critical"), @@ -115,26 +114,26 @@ func setupTestVariationConfigEntriesAndSnapshot( }) fallthrough case "failover-through-local-gateway": - events = append(events, cache.UpdateEvent{ + events = append(events, UpdateEvent{ CorrelationID: "upstream-target:db.default.default.dc2:" + dbUID.String(), Result: &structs.IndexedCheckServiceNodes{ Nodes: TestUpstreamNodesDC2(t), }, }) - events = append(events, cache.UpdateEvent{ + events = append(events, UpdateEvent{ CorrelationID: "mesh-gateway:dc1:" + dbUID.String(), Result: &structs.IndexedNodesWithGateways{ Nodes: TestGatewayNodesDC1(t), }, }) case "failover-through-double-local-gateway-triggered": - events = append(events, cache.UpdateEvent{ + events = append(events, UpdateEvent{ CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(), Result: &structs.IndexedCheckServiceNodes{ Nodes: TestUpstreamNodesInStatus(t, "critical"), }, }) - events = append(events, cache.UpdateEvent{ + events = append(events, UpdateEvent{ CorrelationID: "upstream-target:db.default.default.dc2:" + dbUID.String(), Result: &structs.IndexedCheckServiceNodes{ Nodes: TestUpstreamNodesInStatusDC2(t, "critical"), @@ -142,26 +141,26 @@ func setupTestVariationConfigEntriesAndSnapshot( }) fallthrough case "failover-through-double-local-gateway": - events = append(events, cache.UpdateEvent{ + events = append(events, UpdateEvent{ CorrelationID: "upstream-target:db.default.default.dc3:" + dbUID.String(), Result: &structs.IndexedCheckServiceNodes{ Nodes: TestUpstreamNodesDC2(t), }, }) - events = append(events, cache.UpdateEvent{ + events = append(events, UpdateEvent{ CorrelationID: "mesh-gateway:dc1:" + dbUID.String(), Result: &structs.IndexedNodesWithGateways{ Nodes: TestGatewayNodesDC1(t), }, }) case "splitter-with-resolver-redirect-multidc": - events = append(events, cache.UpdateEvent{ + events = append(events, UpdateEvent{ CorrelationID: "upstream-target:v1.db.default.default.dc1:" + dbUID.String(), Result: &structs.IndexedCheckServiceNodes{ Nodes: TestUpstreamNodes(t, "db"), }, }) - events = append(events, cache.UpdateEvent{ + events = append(events, UpdateEvent{ CorrelationID: "upstream-target:v2.db.default.default.dc2:" + dbUID.String(), Result: &structs.IndexedCheckServiceNodes{ Nodes: TestUpstreamNodesDC2(t), diff --git a/agent/proxycfg/upstreams.go b/agent/proxycfg/upstreams.go index c256cee1bd..1ed36c0e41 100644 --- a/agent/proxycfg/upstreams.go +++ b/agent/proxycfg/upstreams.go @@ -9,7 +9,6 @@ import ( "github.com/mitchellh/mapstructure" "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/cache" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/structs" ) @@ -18,7 +17,7 @@ type handlerUpstreams struct { handlerState } -func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error { +func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u UpdateEvent, snap *ConfigSnapshot) error { if u.Err != nil { return fmt.Errorf("error filling agent cache: %v", u.Err) } diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go index 828e284945..97ed1ae31b 100644 --- a/agent/rpcclient/health/health.go +++ b/agent/rpcclient/health/health.go @@ -26,12 +26,12 @@ type NetRPC interface { type CacheGetter interface { Get(ctx context.Context, t string, r cache.Request) (interface{}, cache.ResultMeta, error) - Notify(ctx context.Context, t string, r cache.Request, cID string, ch chan<- cache.UpdateEvent) error + NotifyCallback(ctx context.Context, t string, r cache.Request, cID string, cb cache.Callback) error } type MaterializedViewStore interface { Get(ctx context.Context, req submatview.Request) (submatview.Result, error) - Notify(ctx context.Context, req submatview.Request, cID string, ch chan<- cache.UpdateEvent) error + NotifyCallback(ctx context.Context, req submatview.Request, cID string, cb cache.Callback) error } func (c *Client) ServiceNodes( @@ -91,14 +91,14 @@ func (c *Client) Notify( ctx context.Context, req structs.ServiceSpecificRequest, correlationID string, - ch chan<- cache.UpdateEvent, + cb cache.Callback, ) error { if c.useStreaming(req) { sr := c.newServiceRequest(req) - return c.ViewStore.Notify(ctx, sr, correlationID, ch) + return c.ViewStore.NotifyCallback(ctx, sr, correlationID, cb) } - return c.Cache.Notify(ctx, c.CacheName, &req, correlationID, ch) + return c.Cache.NotifyCallback(ctx, c.CacheName, &req, correlationID, cb) } func (c *Client) useStreaming(req structs.ServiceSpecificRequest) bool { diff --git a/agent/rpcclient/health/health_test.go b/agent/rpcclient/health/health_test.go index 9ac67805fd..0e6042cc9c 100644 --- a/agent/rpcclient/health/health_test.go +++ b/agent/rpcclient/health/health_test.go @@ -152,7 +152,7 @@ func (f *fakeCache) Get(_ context.Context, t string, _ cache.Request) (interface return result, cache.ResultMeta{}, nil } -func (f *fakeCache) Notify(_ context.Context, t string, _ cache.Request, _ string, _ chan<- cache.UpdateEvent) error { +func (f *fakeCache) NotifyCallback(_ context.Context, t string, _ cache.Request, _ string, _ cache.Callback) error { f.calls = append(f.calls, t) return nil } @@ -175,7 +175,7 @@ func (f *fakeViewStore) Get(_ context.Context, req submatview.Request) (submatvi return submatview.Result{Value: &structs.IndexedCheckServiceNodes{}}, nil } -func (f *fakeViewStore) Notify(_ context.Context, req submatview.Request, _ string, _ chan<- cache.UpdateEvent) error { +func (f *fakeViewStore) NotifyCallback(_ context.Context, req submatview.Request, _ string, _ cache.Callback) error { f.calls = append(f.calls, req) return nil } diff --git a/agent/submatview/store.go b/agent/submatview/store.go index 0b27347934..242a0d70d7 100644 --- a/agent/submatview/store.go +++ b/agent/submatview/store.go @@ -149,6 +149,23 @@ func (s *Store) Notify( req Request, correlationID string, updateCh chan<- cache.UpdateEvent, +) error { + return s.NotifyCallback(ctx, req, correlationID, func(ctx context.Context, event cache.UpdateEvent) { + select { + case updateCh <- event: + case <-ctx.Done(): + return + } + }) +} + +// NotifyCallback subscribes to updates of the entry identified by req in the +// same way as Notify, but accepts a callback function instead of a channel. +func (s *Store) NotifyCallback( + ctx context.Context, + req Request, + correlationID string, + cb cache.Callback, ) error { info := req.CacheInfo() key, materializer, err := s.readEntry(req) @@ -174,16 +191,11 @@ func (s *Store) Notify( } index = result.Index - u := cache.UpdateEvent{ + cb(ctx, cache.UpdateEvent{ CorrelationID: correlationID, Result: result.Value, Meta: cache.ResultMeta{Index: result.Index, Hit: result.Cached}, - } - select { - case updateCh <- u: - case <-ctx.Done(): - return - } + }) } }() return nil diff --git a/agent/submatview/store_integration_test.go b/agent/submatview/store_integration_test.go index 72eed5a5f9..b0efb91cbe 100644 --- a/agent/submatview/store_integration_test.go +++ b/agent/submatview/store_integration_test.go @@ -320,7 +320,12 @@ func (c *consumer) Consume(ctx context.Context, maxIndex uint64) error { group, cctx := errgroup.WithContext(ctx) group.Go(func() error { - return c.healthClient.Notify(cctx, req, "", updateCh) + return c.healthClient.Notify(cctx, req, "", func(ctx context.Context, event cache.UpdateEvent) { + select { + case updateCh <- event: + case <-ctx.Done(): + } + }) }) group.Go(func() error { var idx uint64 diff --git a/agent/xds/clusters_test.go b/agent/xds/clusters_test.go index 8f4156aed7..9474a38ffb 100644 --- a/agent/xds/clusters_test.go +++ b/agent/xds/clusters_test.go @@ -13,7 +13,6 @@ import ( testinf "github.com/mitchellh/go-testing-interface" "github.com/stretchr/testify/require" - "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/xds/proxysupport" @@ -41,7 +40,7 @@ func TestClustersFromSnapshot(t *testing.T) { { name: "connect-proxy-with-tls-outgoing-min-version-auto", create: func(t testinf.T) *proxycfg.ConfigSnapshot { - return proxycfg.TestConfigSnapshot(t, nil, []cache.UpdateEvent{ + return proxycfg.TestConfigSnapshot(t, nil, []proxycfg.UpdateEvent{ { CorrelationID: "mesh", Result: &structs.ConfigEntryResponse{ @@ -60,7 +59,7 @@ func TestClustersFromSnapshot(t *testing.T) { { name: "connect-proxy-with-tls-outgoing-min-version", create: func(t testinf.T) *proxycfg.ConfigSnapshot { - return proxycfg.TestConfigSnapshot(t, nil, []cache.UpdateEvent{ + return proxycfg.TestConfigSnapshot(t, nil, []proxycfg.UpdateEvent{ { CorrelationID: "mesh", Result: &structs.ConfigEntryResponse{ @@ -79,7 +78,7 @@ func TestClustersFromSnapshot(t *testing.T) { { name: "connect-proxy-with-tls-outgoing-max-version", create: func(t testinf.T) *proxycfg.ConfigSnapshot { - return proxycfg.TestConfigSnapshot(t, nil, []cache.UpdateEvent{ + return proxycfg.TestConfigSnapshot(t, nil, []proxycfg.UpdateEvent{ { CorrelationID: "mesh", Result: &structs.ConfigEntryResponse{ @@ -98,7 +97,7 @@ func TestClustersFromSnapshot(t *testing.T) { { name: "connect-proxy-with-tls-outgoing-cipher-suites", create: func(t testinf.T) *proxycfg.ConfigSnapshot { - return proxycfg.TestConfigSnapshot(t, nil, []cache.UpdateEvent{ + return proxycfg.TestConfigSnapshot(t, nil, []proxycfg.UpdateEvent{ { CorrelationID: "mesh", Result: &structs.ConfigEntryResponse{ @@ -406,7 +405,7 @@ func TestClustersFromSnapshot(t *testing.T) { { name: "ingress-gateway-with-tls-outgoing-min-version", create: func(t testinf.T) *proxycfg.ConfigSnapshot { - return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "default", nil, nil, []cache.UpdateEvent{ + return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "default", nil, nil, []proxycfg.UpdateEvent{ { CorrelationID: "mesh", Result: &structs.ConfigEntryResponse{ @@ -425,7 +424,7 @@ func TestClustersFromSnapshot(t *testing.T) { { name: "ingress-gateway-with-tls-outgoing-max-version", create: func(t testinf.T) *proxycfg.ConfigSnapshot { - return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "default", nil, nil, []cache.UpdateEvent{ + return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "default", nil, nil, []proxycfg.UpdateEvent{ { CorrelationID: "mesh", Result: &structs.ConfigEntryResponse{ @@ -444,7 +443,7 @@ func TestClustersFromSnapshot(t *testing.T) { { name: "ingress-gateway-with-tls-outgoing-cipher-suites", create: func(t testinf.T) *proxycfg.ConfigSnapshot { - return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "default", nil, nil, []cache.UpdateEvent{ + return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "default", nil, nil, []proxycfg.UpdateEvent{ { CorrelationID: "mesh", Result: &structs.ConfigEntryResponse{ diff --git a/agent/xds/listeners_test.go b/agent/xds/listeners_test.go index dcdb375527..295ae6d4b6 100644 --- a/agent/xds/listeners_test.go +++ b/agent/xds/listeners_test.go @@ -13,7 +13,6 @@ import ( testinf "github.com/mitchellh/go-testing-interface" "github.com/stretchr/testify/require" - "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/xds/proxysupport" @@ -46,7 +45,7 @@ func TestListenersFromSnapshot(t *testing.T) { { name: "connect-proxy-with-tls-outgoing-min-version-auto", create: func(t testinf.T) *proxycfg.ConfigSnapshot { - return proxycfg.TestConfigSnapshot(t, nil, []cache.UpdateEvent{ + return proxycfg.TestConfigSnapshot(t, nil, []proxycfg.UpdateEvent{ { CorrelationID: "mesh", Result: &structs.ConfigEntryResponse{ @@ -65,7 +64,7 @@ func TestListenersFromSnapshot(t *testing.T) { { name: "connect-proxy-with-tls-incoming-min-version", create: func(t testinf.T) *proxycfg.ConfigSnapshot { - return proxycfg.TestConfigSnapshot(t, nil, []cache.UpdateEvent{ + return proxycfg.TestConfigSnapshot(t, nil, []proxycfg.UpdateEvent{ { CorrelationID: "mesh", Result: &structs.ConfigEntryResponse{ @@ -84,7 +83,7 @@ func TestListenersFromSnapshot(t *testing.T) { { name: "connect-proxy-with-tls-incoming-max-version", create: func(t testinf.T) *proxycfg.ConfigSnapshot { - return proxycfg.TestConfigSnapshot(t, nil, []cache.UpdateEvent{ + return proxycfg.TestConfigSnapshot(t, nil, []proxycfg.UpdateEvent{ { CorrelationID: "mesh", Result: &structs.ConfigEntryResponse{ @@ -103,7 +102,7 @@ func TestListenersFromSnapshot(t *testing.T) { { name: "connect-proxy-with-tls-incoming-cipher-suites", create: func(t testinf.T) *proxycfg.ConfigSnapshot { - return proxycfg.TestConfigSnapshot(t, nil, []cache.UpdateEvent{ + return proxycfg.TestConfigSnapshot(t, nil, []proxycfg.UpdateEvent{ { CorrelationID: "mesh", Result: &structs.ConfigEntryResponse{ @@ -173,7 +172,7 @@ func TestListenersFromSnapshot(t *testing.T) { func(ns *structs.NodeService) { ns.Proxy.Config["protocol"] = "http" }, - []cache.UpdateEvent{ + []proxycfg.UpdateEvent{ { CorrelationID: "mesh", Result: &structs.ConfigEntryResponse{ @@ -580,7 +579,7 @@ func TestListenersFromSnapshot(t *testing.T) { { name: "terminating-gateway-with-tls-incoming-min-version", create: func(t testinf.T) *proxycfg.ConfigSnapshot { - return proxycfg.TestConfigSnapshotTerminatingGateway(t, true, nil, []cache.UpdateEvent{ + return proxycfg.TestConfigSnapshotTerminatingGateway(t, true, nil, []proxycfg.UpdateEvent{ { CorrelationID: "mesh", Result: &structs.ConfigEntryResponse{ @@ -599,7 +598,7 @@ func TestListenersFromSnapshot(t *testing.T) { { name: "terminating-gateway-with-tls-incoming-max-version", create: func(t testinf.T) *proxycfg.ConfigSnapshot { - return proxycfg.TestConfigSnapshotTerminatingGateway(t, true, nil, []cache.UpdateEvent{ + return proxycfg.TestConfigSnapshotTerminatingGateway(t, true, nil, []proxycfg.UpdateEvent{ { CorrelationID: "mesh", Result: &structs.ConfigEntryResponse{ @@ -618,7 +617,7 @@ func TestListenersFromSnapshot(t *testing.T) { { name: "terminating-gateway-with-tls-incoming-cipher-suites", create: func(t testinf.T) *proxycfg.ConfigSnapshot { - return proxycfg.TestConfigSnapshotTerminatingGateway(t, true, nil, []cache.UpdateEvent{ + return proxycfg.TestConfigSnapshotTerminatingGateway(t, true, nil, []proxycfg.UpdateEvent{ { CorrelationID: "mesh", Result: &structs.ConfigEntryResponse{ @@ -677,7 +676,7 @@ func TestListenersFromSnapshot(t *testing.T) { name: "terminating-gateway-no-api-cert", create: func(t testinf.T) *proxycfg.ConfigSnapshot { api := structs.NewServiceName("api", nil) - return proxycfg.TestConfigSnapshotTerminatingGateway(t, true, nil, []cache.UpdateEvent{ + return proxycfg.TestConfigSnapshotTerminatingGateway(t, true, nil, []proxycfg.UpdateEvent{ { CorrelationID: "service-leaf:" + api.String(), // serviceLeafIDPrefix Result: nil, // tombstone this