diff --git a/agent/agent.go b/agent/agent.go index 201dc7aee6..00f488a339 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -377,8 +377,6 @@ func New(bd BaseDeps) (*Agent, error) { Cache: bd.Cache, NetRPC: &a, CacheName: cacheName, - // Temporarily until streaming supports all connect events - CacheNameConnect: cachetype.HealthServicesName, } a.serviceManager = NewServiceManager(&a) @@ -540,6 +538,7 @@ func (a *Agent) Start(ctx context.Context) error { // Start the proxy config manager. a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{ Cache: a.cache, + Health: a.rpcClientHealth, Logger: a.logger.Named(logging.ProxyConfig), State: a.State, Source: &structs.QuerySource{ diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 99c0f36bca..871e49581f 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -202,9 +202,7 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { func newSnapshotHandlers(db ReadDB) stream.SnapshotHandlers { return stream.SnapshotHandlers{ - topicServiceHealth: serviceHealthSnapshot(db, topicServiceHealth), - // The connect topic is temporarily disabled until the correct events are - // created for terminating gateway changes. - //topicServiceHealthConnect: serviceHealthSnapshot(db, topicServiceHealthConnect), + topicServiceHealth: serviceHealthSnapshot(db, topicServiceHealth), + topicServiceHealthConnect: serviceHealthSnapshot(db, topicServiceHealthConnect), } } diff --git a/agent/proxycfg/manager.go b/agent/proxycfg/manager.go index c703da4f06..4858d1f336 100644 --- a/agent/proxycfg/manager.go +++ b/agent/proxycfg/manager.go @@ -4,11 +4,12 @@ import ( "errors" "sync" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/tlsutil" - "github.com/hashicorp/go-hclog" ) var ( @@ -58,6 +59,8 @@ type ManagerConfig struct { // Cache is the agent's cache instance that can be used to retrieve, store and // monitor state for the proxies. Cache *cache.Cache + // Health provides service health updates on a notification channel. + Health Health // state is the agent's local state to be watched for new proxy registrations. State *local.State // source describes the current agent's identity, it's used directly for @@ -195,6 +198,7 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, token string // Set the necessary dependencies state.logger = m.Logger.With("service_id", sid.String()) state.cache = m.Cache + state.health = m.Health state.source = m.Source state.dnsConfig = m.DNSConfig state.intentionDefaultAllow = m.IntentionDefaultAllow diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index 01b896d293..6d68342e4d 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -14,6 +14,7 @@ import ( "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/local" + "github.com/hashicorp/consul/agent/rpcclient/health" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/sdk/testutil" @@ -342,7 +343,13 @@ func testManager_BasicLifecycle( state.TriggerSyncChanges = func() {} // Create manager - m, err := NewManager(ManagerConfig{c, state, source, DNSConfig{}, logger, nil, false}) + m, err := NewManager(ManagerConfig{ + Cache: c, + Health: &health.Client{Cache: c, CacheName: cachetype.HealthServicesName}, + State: state, + Source: source, + Logger: logger, + }) require.NoError(err) // And run it diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 59d11a2f11..7eeeb03f38 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -9,13 +9,14 @@ import ( "strings" "time" + "github.com/hashicorp/go-hclog" + "github.com/mitchellh/copystructure" + "github.com/mitchellh/mapstructure" + "github.com/hashicorp/consul/agent/cache" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/logging" - "github.com/hashicorp/go-hclog" - "github.com/mitchellh/copystructure" - "github.com/mitchellh/mapstructure" ) type CacheNotifier interface { @@ -23,6 +24,10 @@ type CacheNotifier interface { correlationID string, ch chan<- cache.UpdateEvent) error } +type Health interface { + Notify(ctx context.Context, req structs.ServiceSpecificRequest, correlationID string, ch chan<- cache.UpdateEvent) error +} + const ( coalesceTimeout = 200 * time.Millisecond rootsWatchID = "roots" @@ -54,6 +59,7 @@ type state struct { logger hclog.Logger source *structs.QuerySource cache CacheNotifier + health Health dnsConfig DNSConfig serverSNIFn ServerSNIFunc intentionDefaultAllow bool @@ -155,6 +161,7 @@ func newState(ns *structs.NodeService, token string) (*state, error) { taggedAddresses: taggedAddresses, proxyCfg: proxyCfg, token: token, + // 10 is fairly arbitrary here but allow for the 3 mandatory and a // reasonable number of upstream watches to all deliver their initial // messages in parallel without blocking the cache.Notify loops. It's not a @@ -225,7 +232,7 @@ func (s *state) watchConnectProxyService(ctx context.Context, correlationId stri var finalMeta structs.EnterpriseMeta finalMeta.Merge(entMeta) - return s.cache.Notify(ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{ + return s.health.Notify(ctx, structs.ServiceSpecificRequest{ Datacenter: dc, QueryOptions: structs.QueryOptions{ Token: s.token, @@ -443,7 +450,7 @@ func (s *state) initWatchesMeshGateway() error { return err } - err = s.cache.Notify(s.ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{ + err = s.health.Notify(s.ctx, structs.ServiceSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, ServiceName: structs.ConsulServiceName, @@ -969,7 +976,7 @@ func (s *state) handleUpdateTerminatingGateway(u cache.UpdateEvent, snap *Config // Watch the health endpoint to discover endpoints for the service if _, ok := snap.TerminatingGateway.WatchedServices[svc.Service]; !ok { ctx, cancel := context.WithCancel(s.ctx) - err := s.cache.Notify(ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{ + err := s.health.Notify(ctx, structs.ServiceSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, ServiceName: svc.Service.Name, @@ -1267,7 +1274,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho if _, ok := snap.MeshGateway.WatchedServices[svc]; !ok { ctx, cancel := context.WithCancel(s.ctx) - err := s.cache.Notify(ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{ + err := s.health.Notify(ctx, structs.ServiceSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, ServiceName: svc.Name, diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index 19a0ed22b4..4b2f024046 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -6,12 +6,14 @@ import ( "sync" "testing" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/agent/cache" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/consul/discoverychain" + "github.com/hashicorp/consul/agent/rpcclient/health" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/sdk/testutil" - "github.com/stretchr/testify/require" ) func TestStateChanged(t *testing.T) { @@ -143,6 +145,10 @@ func (cn *testCacheNotifier) Notify(ctx context.Context, t string, r cache.Reque return nil } +func (cn *testCacheNotifier) Get(ctx context.Context, t string, r cache.Request) (interface{}, cache.ResultMeta, error) { + panic("Get: not implemented") +} + func (cn *testCacheNotifier) getNotifierRequest(t testing.TB, correlationId string) testCacheNotifierRequest { cn.lock.RLock() req, ok := cn.notifiers[correlationId] @@ -1521,6 +1527,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { // setup a new testing cache notifier cn := newTestCacheNotifier() state.cache = cn + state.health = &health.Client{Cache: cn, CacheName: cachetype.HealthServicesName} // setup the local datacenter information state.source = &structs.QuerySource{ diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go index adc85e83cc..259c5b5f97 100644 --- a/agent/rpcclient/health/health.go +++ b/agent/rpcclient/health/health.go @@ -12,8 +12,6 @@ type Client struct { Cache CacheGetter // CacheName to use for service health. CacheName string - // CacheNameConnect is the name of the cache to use for connect service health. - CacheNameConnect string } type NetRPC interface { @@ -22,6 +20,7 @@ type NetRPC interface { type CacheGetter interface { Get(ctx context.Context, t string, r cache.Request) (interface{}, cache.ResultMeta, error) + Notify(ctx context.Context, t string, r cache.Request, cID string, ch chan<- cache.UpdateEvent) error } func (c *Client) ServiceNodes( @@ -54,12 +53,7 @@ func (c *Client) getServiceNodes( return out, cache.ResultMeta{}, err } - cacheName := c.CacheName - if req.Connect { - cacheName = c.CacheNameConnect - } - - raw, md, err := c.Cache.Get(ctx, cacheName, &req) + raw, md, err := c.Cache.Get(ctx, c.CacheName, &req) if err != nil { return out, md, err } @@ -71,3 +65,12 @@ func (c *Client) getServiceNodes( return *value, md, nil } + +func (c *Client) Notify( + ctx context.Context, + req structs.ServiceSpecificRequest, + correlationID string, + ch chan<- cache.UpdateEvent, +) error { + return c.Cache.Notify(ctx, c.CacheName, &req, correlationID, ch) +}