proxycfg: split state into kind-specific types

This commit extracts all the kind-specific logic into handler types, and
keeps the generic parts on the state struct. This change should make it
easier to add new kinds, and see the implementation of each kind more
clearly.
This commit is contained in:
Daniel Nephin 2020-12-23 18:03:30 -05:00
parent cd05df7157
commit 32c15d9a88
3 changed files with 205 additions and 203 deletions

View File

@ -189,14 +189,8 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, token string
state.Close()
}
var err error
state, err = newState(ns, token)
if err != nil {
return err
}
// TODO: move to a function that translates ManagerConfig->stateConfig
state.stateConfig = stateConfig{
stateConfig := stateConfig{
logger: m.Logger.With("service_id", sid.String()),
cache: m.Cache,
health: m.Health,
@ -205,7 +199,13 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, token string
intentionDefaultAllow: m.IntentionDefaultAllow,
}
if m.TLSConfigurator != nil {
state.serverSNIFn = m.TLSConfigurator.ServerSNI
stateConfig.serverSNIFn = m.TLSConfigurator.ServerSNI
}
var err error
state, err = newState(ns, token, stateConfig)
if err != nil {
return err
}
ch, err := state.Watch()

View File

@ -67,10 +67,9 @@ type stateConfig struct {
// connect-proxy service. When a proxy registration is changed, the entire state
// is discarded and a new one created.
type state struct {
// TODO: un-embedd once refactor is complete
stateConfig
// TODO: un-embed once refactor is complete
serviceInstance
logger hclog.Logger
serviceInstance serviceInstance
handler kindHandler
// cancel is set by Watch and called by Close to stop the goroutine started
// in Watch.
@ -136,24 +135,7 @@ func copyProxyConfig(ns *structs.NodeService) (structs.ConnectProxyConfig, error
//
// The returned state needs its required dependencies to be set before Watch
// can be called.
func newState(ns *structs.NodeService, token string) (*state, error) {
switch ns.Kind {
case structs.ServiceKindConnectProxy:
case structs.ServiceKindTerminatingGateway:
case structs.ServiceKindMeshGateway:
case structs.ServiceKindIngressGateway:
default:
return nil, errors.New("not a connect-proxy, terminating-gateway, mesh-gateway, or ingress-gateway")
}
s, err := newServiceInstanceFromNodeService(ns, token)
if err != nil {
return nil, err
}
return &state{
serviceInstance: s,
func newState(ns *structs.NodeService, token string, config stateConfig) (*state, error) {
// 10 is fairly arbitrary here but allow for the 3 mandatory and a
// reasonable number of upstream watches to all deliver their initial
// messages in parallel without blocking the cache.Notify loops. It's not a
@ -161,7 +143,34 @@ func newState(ns *structs.NodeService, token string) (*state, error) {
// conservative to handle larger numbers of upstreams correctly but gives
// some head room for normal operation to be non-blocking in most typical
// cases.
ch: make(chan cache.UpdateEvent, 10),
ch := make(chan cache.UpdateEvent, 10)
s, err := newServiceInstanceFromNodeService(ns, token)
if err != nil {
return nil, err
}
var handler kindHandler
switch ns.Kind {
case structs.ServiceKindConnectProxy:
handler = &handlerConnectProxy{stateConfig: config, serviceInstance: s, ch: ch}
case structs.ServiceKindTerminatingGateway:
config.logger = config.logger.Named(logging.TerminatingGateway)
handler = &handlerTerminatingGateway{stateConfig: config, serviceInstance: s, ch: ch}
case structs.ServiceKindMeshGateway:
config.logger = config.logger.Named(logging.MeshGateway)
handler = &handlerMeshGateway{stateConfig: config, serviceInstance: s, ch: ch}
case structs.ServiceKindIngressGateway:
handler = &handlerIngressGateway{stateConfig: config, serviceInstance: s, ch: ch}
default:
return nil, errors.New("not a connect-proxy, terminating-gateway, mesh-gateway, or ingress-gateway")
}
return &state{
logger: config.logger.With("proxy", s.proxyID, "kind", s.kind),
serviceInstance: s,
handler: handler,
ch: ch,
snapCh: make(chan ConfigSnapshot, 1),
reqCh: make(chan chan *ConfigSnapshot, 1),
}, nil
@ -196,6 +205,11 @@ func newServiceInstanceFromNodeService(ns *structs.NodeService, token string) (s
}, nil
}
type kindHandler interface {
initialize(ctx context.Context) (ConfigSnapshot, error)
handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error
}
// Watch initialized watches on all necessary cache data for the current proxy
// registration state and returns a chan to observe updates to the
// ConfigSnapshot that contains all necessary config state. The chan is closed
@ -204,8 +218,7 @@ func (s *state) Watch() (<-chan ConfigSnapshot, error) {
var ctx context.Context
ctx, s.cancel = context.WithCancel(context.Background())
snap := s.initialConfigSnapshot()
err := s.initWatches(ctx, &snap)
snap, err := s.handler.initialize(ctx)
if err != nil {
s.cancel()
return nil, err
@ -224,23 +237,21 @@ func (s *state) Close() error {
return nil
}
// initWatches sets up the watches needed for the particular service
func (s *state) initWatches(ctx context.Context, snap *ConfigSnapshot) error {
switch s.kind {
case structs.ServiceKindConnectProxy:
return s.initWatchesConnectProxy(ctx, snap)
case structs.ServiceKindTerminatingGateway:
return s.initWatchesTerminatingGateway(ctx)
case structs.ServiceKindMeshGateway:
return s.initWatchesMeshGateway(ctx)
case structs.ServiceKindIngressGateway:
return s.initWatchesIngressGateway(ctx)
default:
return fmt.Errorf("Unsupported service kind")
}
type handler struct {
stateConfig // TODO: un-embed
serviceInstance // TODO: un-embed
ch chan cache.UpdateEvent
}
func (s *state) watchMeshGateway(ctx context.Context, dc string, upstreamID string) error {
type handlerMeshGateway handler
type handlerTerminatingGateway handler
type handlerConnectProxy handler
type handlerIngressGateway handler
func (s *handlerUpstreams) watchMeshGateway(ctx context.Context, dc string, upstreamID string) error {
return s.cache.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
Datacenter: dc,
QueryOptions: structs.QueryOptions{Token: s.token},
@ -251,9 +262,40 @@ func (s *state) watchMeshGateway(ctx context.Context, dc string, upstreamID stri
}, "mesh-gateway:"+dc+":"+upstreamID, s.ch)
}
// initWatchesConnectProxy sets up the watches needed based on current proxy registration
type handlerUpstreams handler
func (s *handlerUpstreams) watchConnectProxyService(ctx context.Context, correlationId string, target *structs.DiscoveryTarget) error {
return s.stateConfig.cache.Notify(ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{
Datacenter: target.Datacenter,
QueryOptions: structs.QueryOptions{
Token: s.serviceInstance.token,
Filter: target.Subset.Filter,
},
ServiceName: target.Service,
Connect: true,
// Note that Identifier doesn't type-prefix for service any more as it's
// the default and makes metrics and other things much cleaner. It's
// simpler for us if we have the type to make things unambiguous.
Source: *s.stateConfig.source,
EnterpriseMeta: *target.GetEnterpriseMetadata(),
}, correlationId, s.ch)
}
// initialize sets up the watches needed based on current proxy registration
// state.
func (s *state) initWatchesConnectProxy(ctx context.Context, snap *ConfigSnapshot) error {
func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, error) {
snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig)
snap.ConnectProxy.DiscoveryChain = make(map[string]*structs.CompiledDiscoveryChain)
snap.ConnectProxy.WatchedDiscoveryChains = make(map[string]context.CancelFunc)
snap.ConnectProxy.WatchedUpstreams = make(map[string]map[string]context.CancelFunc)
snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
snap.ConnectProxy.WatchedGateways = make(map[string]map[string]context.CancelFunc)
snap.ConnectProxy.WatchedGatewayEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType)
snap.ConnectProxy.PreparedQueryEndpoints = make(map[string]structs.CheckServiceNodes)
snap.ConnectProxy.UpstreamConfig = make(map[string]*structs.Upstream)
snap.ConnectProxy.PassthroughUpstreams = make(map[string]ServicePassthroughAddrs)
// Watch for root changes
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter,
@ -261,7 +303,7 @@ func (s *state) initWatchesConnectProxy(ctx context.Context, snap *ConfigSnapsho
Source: *s.source,
}, rootsWatchID, s.ch)
if err != nil {
return err
return snap, err
}
// Watch the leaf cert
@ -272,7 +314,7 @@ func (s *state) initWatchesConnectProxy(ctx context.Context, snap *ConfigSnapsho
EnterpriseMeta: s.proxyID.EnterpriseMeta,
}, leafWatchID, s.ch)
if err != nil {
return err
return snap, err
}
// Watch for intention updates
@ -290,7 +332,7 @@ func (s *state) initWatchesConnectProxy(ctx context.Context, snap *ConfigSnapsho
},
}, intentionsWatchID, s.ch)
if err != nil {
return err
return snap, err
}
// Watch for service check updates
@ -299,7 +341,7 @@ func (s *state) initWatchesConnectProxy(ctx context.Context, snap *ConfigSnapsho
EnterpriseMeta: s.proxyID.EnterpriseMeta,
}, svcChecksWatchIDPrefix+structs.ServiceIDString(s.proxyCfg.DestinationServiceID, &s.proxyID.EnterpriseMeta), s.ch)
if err != nil {
return err
return snap, err
}
// default the namespace to the namespace of this proxy service
@ -314,7 +356,7 @@ func (s *state) initWatchesConnectProxy(ctx context.Context, snap *ConfigSnapsho
EnterpriseMeta: structs.NewEnterpriseMeta(s.proxyID.NamespaceOrEmpty()),
}, intentionUpstreamsID, s.ch)
if err != nil {
return err
return snap, err
}
err = s.cache.Notify(ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{
@ -325,7 +367,7 @@ func (s *state) initWatchesConnectProxy(ctx context.Context, snap *ConfigSnapsho
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}, meshConfigEntryID, s.ch)
if err != nil {
return err
return snap, err
}
}
@ -381,7 +423,7 @@ func (s *state) initWatchesConnectProxy(ctx context.Context, snap *ConfigSnapsho
Source: *s.source,
}, "upstream:"+u.Identifier(), s.ch)
if err != nil {
return err
return snap, err
}
case structs.UpstreamDestTypeService:
@ -399,17 +441,18 @@ func (s *state) initWatchesConnectProxy(ctx context.Context, snap *ConfigSnapsho
OverrideConnectTimeout: cfg.ConnectTimeout(),
}, "discovery-chain:"+u.Identifier(), s.ch)
if err != nil {
return fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err)
return snap, fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err)
}
default:
return fmt.Errorf("unknown upstream type: %q", u.DestinationType)
return snap, fmt.Errorf("unknown upstream type: %q", u.DestinationType)
}
}
return nil
}
// reducedProxyConfig represents the basic opaque config values that are now
return snap, nil
}
// reducedUpstreamConfig represents the basic opaque config values that are now
// managed with the discovery chain but for backwards compatibility reasons
// should still affect how the proxy is configured.
//
@ -430,7 +473,8 @@ func parseReducedUpstreamConfig(m map[string]interface{}) (reducedUpstreamConfig
}
// initWatchesTerminatingGateway sets up the initial watches needed based on the terminating-gateway registration
func (s *state) initWatchesTerminatingGateway(ctx context.Context) error {
func (s *handlerTerminatingGateway) initialize(ctx context.Context) (ConfigSnapshot, error) {
snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig)
// Watch for root changes
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter,
@ -438,9 +482,8 @@ func (s *state) initWatchesTerminatingGateway(ctx context.Context) error {
Source: *s.source,
}, rootsWatchID, s.ch)
if err != nil {
s.logger.Named(logging.TerminatingGateway).
Error("failed to register watch for root changes", "error", err)
return err
s.logger.Error("failed to register watch for root changes", "error", err)
return snap, err
}
// Watch for the terminating-gateway's linked services
@ -451,159 +494,10 @@ func (s *state) initWatchesTerminatingGateway(ctx context.Context) error {
EnterpriseMeta: s.proxyID.EnterpriseMeta,
}, gatewayServicesWatchID, s.ch)
if err != nil {
s.logger.Named(logging.TerminatingGateway).
Error("failed to register watch for linked services", "error", err)
return err
s.logger.Error("failed to register watch for linked services", "error", err)
return snap, err
}
return nil
}
// initWatchesMeshGateway sets up the watches needed based on the current mesh gateway registration
func (s *state) initWatchesMeshGateway(ctx context.Context) error {
// Watch for root changes
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Source: *s.source,
}, rootsWatchID, s.ch)
if err != nil {
return err
}
// Watch for all services
err = s.cache.Notify(ctx, cachetype.CatalogServiceListName, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Source: *s.source,
EnterpriseMeta: *structs.WildcardEnterpriseMeta(),
}, serviceListWatchID, s.ch)
if err != nil {
return err
}
if s.meta[structs.MetaWANFederationKey] == "1" {
// Conveniently we can just use this service meta attribute in one
// place here to set the machinery in motion and leave the conditional
// behavior out of the rest of the package.
err = s.cache.Notify(ctx, cachetype.FederationStateListMeshGatewaysName, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Source: *s.source,
}, federationStateListGatewaysWatchID, s.ch)
if err != nil {
return err
}
err = s.health.Notify(ctx, structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: structs.ConsulServiceName,
}, consulServerListWatchID, s.ch)
if err != nil {
return err
}
}
// Eventually we will have to watch connect enable instances for each service as well as the
// destination services themselves but those notifications will be setup later. However we
// cannot setup those watches until we know what the services are. from the service list
// watch above
err = s.cache.Notify(ctx, cachetype.CatalogDatacentersName, &structs.DatacentersRequest{
QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: 30 * time.Second},
}, datacentersWatchID, s.ch)
if err != nil {
return err
}
// Once we start getting notified about the datacenters we will setup watches on the
// gateways within those other datacenters. We cannot do that here because we don't
// know what they are yet.
// Watch service-resolvers so we can setup service subset clusters
err = s.cache.Notify(ctx, cachetype.ConfigEntriesName, &structs.ConfigEntryQuery{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Kind: structs.ServiceResolver,
EnterpriseMeta: *structs.WildcardEnterpriseMeta(),
}, serviceResolversWatchID, s.ch)
if err != nil {
s.logger.Named(logging.MeshGateway).
Error("failed to register watch for service-resolver config entries", "error", err)
return err
}
return err
}
func (s *state) initWatchesIngressGateway(ctx context.Context) error {
// Watch for root changes
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Source: *s.source,
}, rootsWatchID, s.ch)
if err != nil {
return err
}
// Watch this ingress gateway's config entry
err = s.cache.Notify(ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{
Kind: structs.IngressGateway,
Name: s.service,
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
EnterpriseMeta: s.proxyID.EnterpriseMeta,
}, gatewayConfigWatchID, s.ch)
if err != nil {
return err
}
// Watch the ingress-gateway's list of upstreams
err = s.cache.Notify(ctx, cachetype.GatewayServicesName, &structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: s.service,
EnterpriseMeta: s.proxyID.EnterpriseMeta,
}, gatewayServicesWatchID, s.ch)
if err != nil {
return err
}
return nil
}
func (s *state) initialConfigSnapshot() ConfigSnapshot {
snap := ConfigSnapshot{
Kind: s.kind,
Service: s.service,
ProxyID: s.proxyID,
Address: s.address,
Port: s.port,
ServiceMeta: s.meta,
TaggedAddresses: s.taggedAddresses,
Proxy: s.proxyCfg,
Datacenter: s.source.Datacenter,
ServerSNIFn: s.serverSNIFn,
IntentionDefaultAllow: s.intentionDefaultAllow,
}
switch s.kind {
case structs.ServiceKindConnectProxy:
snap.ConnectProxy.DiscoveryChain = make(map[string]*structs.CompiledDiscoveryChain)
snap.ConnectProxy.WatchedDiscoveryChains = make(map[string]context.CancelFunc)
snap.ConnectProxy.WatchedUpstreams = make(map[string]map[string]context.CancelFunc)
snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
snap.ConnectProxy.WatchedGateways = make(map[string]map[string]context.CancelFunc)
snap.ConnectProxy.WatchedGatewayEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType)
snap.ConnectProxy.PreparedQueryEndpoints = make(map[string]structs.CheckServiceNodes)
snap.ConnectProxy.UpstreamConfig = make(map[string]*structs.Upstream)
snap.ConnectProxy.PassthroughUpstreams = make(map[string]ServicePassthroughAddrs)
case structs.ServiceKindTerminatingGateway:
snap.TerminatingGateway.WatchedServices = make(map[structs.ServiceName]context.CancelFunc)
snap.TerminatingGateway.WatchedIntentions = make(map[structs.ServiceName]context.CancelFunc)
snap.TerminatingGateway.Intentions = make(map[structs.ServiceName]structs.Intentions)
@ -617,7 +511,86 @@ func (s *state) initialConfigSnapshot() ConfigSnapshot {
snap.TerminatingGateway.ServiceGroups = make(map[structs.ServiceName]structs.CheckServiceNodes)
snap.TerminatingGateway.GatewayServices = make(map[structs.ServiceName]structs.GatewayService)
snap.TerminatingGateway.HostnameServices = make(map[structs.ServiceName]structs.CheckServiceNodes)
case structs.ServiceKindMeshGateway:
return snap, nil
}
// initWatchesMeshGateway sets up the watches needed based on the current mesh gateway registration
func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, error) {
snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig)
// Watch for root changes
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Source: *s.source,
}, rootsWatchID, s.ch)
if err != nil {
return snap, err
}
// Watch for all services
err = s.cache.Notify(ctx, cachetype.CatalogServiceListName, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Source: *s.source,
EnterpriseMeta: *structs.WildcardEnterpriseMeta(),
}, serviceListWatchID, s.ch)
if err != nil {
return snap, err
}
if s.meta[structs.MetaWANFederationKey] == "1" {
// Conveniently we can just use this service meta attribute in one
// place here to set the machinery in motion and leave the conditional
// behavior out of the rest of the package.
err = s.cache.Notify(ctx, cachetype.FederationStateListMeshGatewaysName, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Source: *s.source,
}, federationStateListGatewaysWatchID, s.ch)
if err != nil {
return snap, err
}
err = s.health.Notify(ctx, structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: structs.ConsulServiceName,
}, consulServerListWatchID, s.ch)
if err != nil {
return snap, err
}
}
// Eventually we will have to watch connect enable instances for each service as well as the
// destination services themselves but those notifications will be setup later. However we
// cannot setup those watches until we know what the services are. from the service list
// watch above
err = s.cache.Notify(ctx, cachetype.CatalogDatacentersName, &structs.DatacentersRequest{
QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: 30 * time.Second},
}, datacentersWatchID, s.ch)
if err != nil {
return snap, err
}
// Once we start getting notified about the datacenters we will setup watches on the
// gateways within those other datacenters. We cannot do that here because we don't
// know what they are yet.
// Watch service-resolvers so we can setup service subset clusters
err = s.cache.Notify(ctx, cachetype.ConfigEntriesName, &structs.ConfigEntryQuery{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Kind: structs.ServiceResolver,
EnterpriseMeta: *structs.WildcardEnterpriseMeta(),
}, serviceResolversWatchID, s.ch)
if err != nil {
s.logger.Named(logging.MeshGateway).
Error("failed to register watch for service-resolver config entries", "error", err)
return snap, err
}
snap.MeshGateway.WatchedServices = make(map[structs.ServiceName]context.CancelFunc)
snap.MeshGateway.WatchedDatacenters = make(map[string]context.CancelFunc)
snap.MeshGateway.ServiceGroups = make(map[structs.ServiceName]structs.CheckServiceNodes)
@ -626,16 +599,68 @@ func (s *state) initialConfigSnapshot() ConfigSnapshot {
snap.MeshGateway.HostnameDatacenters = make(map[string]structs.CheckServiceNodes)
// there is no need to initialize the map of service resolvers as we
// fully rebuild it every time we get updates
case structs.ServiceKindIngressGateway:
return snap, err
}
func (s *handlerIngressGateway) initialize(ctx context.Context) (ConfigSnapshot, error) {
snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig)
// Watch for root changes
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Source: *s.source,
}, rootsWatchID, s.ch)
if err != nil {
return snap, err
}
// Watch this ingress gateway's config entry
err = s.cache.Notify(ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{
Kind: structs.IngressGateway,
Name: s.service,
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
EnterpriseMeta: s.proxyID.EnterpriseMeta,
}, gatewayConfigWatchID, s.ch)
if err != nil {
return snap, err
}
// Watch the ingress-gateway's list of upstreams
err = s.cache.Notify(ctx, cachetype.GatewayServicesName, &structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: s.service,
EnterpriseMeta: s.proxyID.EnterpriseMeta,
}, gatewayServicesWatchID, s.ch)
if err != nil {
return snap, err
}
snap.IngressGateway.WatchedDiscoveryChains = make(map[string]context.CancelFunc)
snap.IngressGateway.DiscoveryChain = make(map[string]*structs.CompiledDiscoveryChain)
snap.IngressGateway.WatchedUpstreams = make(map[string]map[string]context.CancelFunc)
snap.IngressGateway.WatchedUpstreamEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
snap.IngressGateway.WatchedGateways = make(map[string]map[string]context.CancelFunc)
snap.IngressGateway.WatchedGatewayEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
return snap, nil
}
return snap
func newConfigSnapshotFromServiceInstance(s serviceInstance, config stateConfig) ConfigSnapshot {
// TODO: use serviceInstance type in ConfigSnapshot
return ConfigSnapshot{
Kind: s.kind,
Service: s.service,
ProxyID: s.proxyID,
Address: s.address,
Port: s.port,
ServiceMeta: s.meta,
TaggedAddresses: s.taggedAddresses,
Proxy: s.proxyCfg,
Datacenter: config.source.Datacenter,
ServerSNIFn: config.serverSNIFn,
IntentionDefaultAllow: config.intentionDefaultAllow,
}
}
func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
@ -659,7 +684,7 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
case u := <-s.ch:
s.logger.Trace("A blocking query returned; handling snapshot update")
if err := s.handleUpdate(ctx, u, snap); err != nil {
if err := s.handler.handleUpdate(ctx, u, snap); err != nil {
s.logger.Error("Failed to handle update from watch",
"id", u.CorrelationID, "error", err,
)
@ -671,9 +696,7 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
// etc on future updates.
snapCopy, err := snap.Clone()
if err != nil {
s.logger.Error("Failed to copy config snapshot for proxy",
"error", err,
)
s.logger.Error("Failed to copy config snapshot for proxy", "error", err)
continue
}
@ -719,9 +742,7 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
// etc on future updates.
snapCopy, err := snap.Clone()
if err != nil {
s.logger.Error("Failed to copy config snapshot for proxy",
"error", err,
)
s.logger.Error("Failed to copy config snapshot for proxy", "error", err)
continue
}
replyCh <- snapCopy
@ -748,22 +769,7 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
}
}
func (s *state) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
switch s.kind {
case structs.ServiceKindConnectProxy:
return s.handleUpdateConnectProxy(ctx, u, snap)
case structs.ServiceKindTerminatingGateway:
return s.handleUpdateTerminatingGateway(ctx, u, snap)
case structs.ServiceKindMeshGateway:
return s.handleUpdateMeshGateway(ctx, u, snap)
case structs.ServiceKindIngressGateway:
return s.handleUpdateIngressGateway(ctx, u, snap)
default:
return fmt.Errorf("Unsupported service kind")
}
}
func (s *state) handleUpdateConnectProxy(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
if u.Err != nil {
return fmt.Errorf("error filling agent cache: %v", u.Err)
}
@ -838,7 +844,7 @@ func (s *state) handleUpdateConnectProxy(ctx context.Context, u cache.UpdateEven
cfg: cfg,
meshGateway: meshGateway,
}
err = s.watchDiscoveryChain(ctx, snap, watchOpts)
err = (*handlerUpstreams)(s).watchDiscoveryChain(ctx, snap, watchOpts)
if err != nil {
return fmt.Errorf("failed to watch discovery chain for %s: %v", svc.String(), err)
}
@ -927,12 +933,12 @@ func (s *state) handleUpdateConnectProxy(ctx context.Context, u cache.UpdateEven
snap.ConnectProxy.MeshConfigSet = true
default:
return s.handleUpdateUpstreams(ctx, u, snap)
return (*handlerUpstreams)(s).handleUpdateUpstreams(ctx, u, snap)
}
return nil
}
func (s *state) handleUpdateUpstreams(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
if u.Err != nil {
return fmt.Errorf("error filling agent cache: %v", u.Err)
}
@ -1049,7 +1055,7 @@ func removeColonPrefix(s string) (string, string, bool) {
return s[0:idx], s[idx+1:], true
}
func (s *state) resetWatchesFromChain(
func (s *handlerUpstreams) resetWatchesFromChain(
ctx context.Context,
id string,
chain *structs.CompiledDiscoveryChain,
@ -1196,7 +1202,7 @@ type targetWatchOpts struct {
entMeta *structs.EnterpriseMeta
}
func (s *state) watchUpstreamTarget(ctx context.Context, snap *ConfigSnapshotUpstreams, opts targetWatchOpts) error {
func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *ConfigSnapshotUpstreams, opts targetWatchOpts) error {
s.logger.Trace("initializing watch of target",
"upstream", opts.upstreamID,
"chain", opts.service,
@ -1233,11 +1239,11 @@ func (s *state) watchUpstreamTarget(ctx context.Context, snap *ConfigSnapshotUps
return nil
}
func (s *state) handleUpdateTerminatingGateway(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
func (s *handlerTerminatingGateway) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
if u.Err != nil {
return fmt.Errorf("error filling agent cache: %v", u.Err)
}
logger := s.logger.Named(logging.TerminatingGateway)
logger := s.logger
switch {
case u.CorrelationID == rootsWatchID:
@ -1461,7 +1467,7 @@ func (s *state) handleUpdateTerminatingGateway(ctx context.Context, u cache.Upda
if len(resp.Nodes) > 0 {
snap.TerminatingGateway.ServiceGroups[sn] = resp.Nodes
snap.TerminatingGateway.HostnameServices[sn] = hostnameEndpoints(
s.logger.Named(logging.TerminatingGateway), snap.Datacenter, resp.Nodes)
s.logger, snap.Datacenter, resp.Nodes)
}
// Store leaf cert for watched service
@ -1519,7 +1525,7 @@ func (s *state) handleUpdateTerminatingGateway(ctx context.Context, u cache.Upda
return nil
}
func (s *state) handleUpdateMeshGateway(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
if u.Err != nil {
return fmt.Errorf("error filling agent cache: %v", u.Err)
}
@ -1722,7 +1728,7 @@ func (s *state) handleUpdateMeshGateway(ctx context.Context, u cache.UpdateEvent
return nil
}
func (s *state) handleUpdateIngressGateway(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
func (s *handlerIngressGateway) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
if u.Err != nil {
return fmt.Errorf("error filling agent cache: %v", u.Err)
}
@ -1770,7 +1776,7 @@ func (s *state) handleUpdateIngressGateway(ctx context.Context, u cache.UpdateEv
namespace: u.DestinationNamespace,
datacenter: s.source.Datacenter,
}
err := s.watchDiscoveryChain(ctx, snap, watchOpts)
err := (*handlerUpstreams)(s).watchDiscoveryChain(ctx, snap, watchOpts)
if err != nil {
return fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err)
}
@ -1798,7 +1804,7 @@ func (s *state) handleUpdateIngressGateway(ctx context.Context, u cache.UpdateEv
}
default:
return s.handleUpdateUpstreams(ctx, u, snap)
return (*handlerUpstreams)(s).handleUpdateUpstreams(ctx, u, snap)
}
return nil
@ -1831,7 +1837,7 @@ type discoveryChainWatchOpts struct {
meshGateway structs.MeshGatewayConfig
}
func (s *state) watchDiscoveryChain(ctx context.Context, snap *ConfigSnapshot, opts discoveryChainWatchOpts) error {
func (s *handlerUpstreams) watchDiscoveryChain(ctx context.Context, snap *ConfigSnapshot, opts discoveryChainWatchOpts) error {
if _, ok := snap.ConnectProxy.WatchedDiscoveryChains[opts.id]; ok {
return nil
}
@ -1865,7 +1871,7 @@ func (s *state) watchDiscoveryChain(ctx context.Context, snap *ConfigSnapshot, o
return nil
}
func (s *state) generateIngressDNSSANs(snap *ConfigSnapshot) []string {
func (s *handlerIngressGateway) generateIngressDNSSANs(snap *ConfigSnapshot) []string {
// Update our leaf cert watch with wildcard entries for our DNS domains as well as any
// configured custom hostnames from the service.
if !snap.IngressGateway.TLSEnabled {
@ -1902,7 +1908,7 @@ func (s *state) generateIngressDNSSANs(snap *ConfigSnapshot) []string {
return dnsNames
}
func (s *state) watchIngressLeafCert(ctx context.Context, snap *ConfigSnapshot) error {
func (s *handlerIngressGateway) watchIngressLeafCert(ctx context.Context, snap *ConfigSnapshot) error {
if !snap.IngressGateway.TLSSet || !snap.IngressGateway.HostsSet {
return nil
}
@ -1951,12 +1957,13 @@ func (s *state) Changed(ns *structs.NodeService, token string) bool {
s.logger.Warn("Failed to parse proxy config and will treat the new service as unchanged")
}
return ns.Kind != s.kind ||
s.proxyID != ns.CompoundServiceID() ||
s.address != ns.Address ||
s.port != ns.Port ||
!reflect.DeepEqual(s.proxyCfg, proxyCfg) ||
s.token != token
i := s.serviceInstance
return ns.Kind != i.kind ||
i.proxyID != ns.CompoundServiceID() ||
i.address != ns.Address ||
i.port != ns.Port ||
!reflect.DeepEqual(i.proxyCfg, proxyCfg) ||
i.token != token
}
// hostnameEndpoints returns all CheckServiceNodes that have hostnames instead of IPs as the address.

View File

@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
@ -115,7 +116,7 @@ func TestStateChanged(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
state, err := newState(tt.ns, tt.token)
state, err := newState(tt.ns, tt.token, stateConfig{logger: hclog.New(nil)})
require.NoError(err)
otherNS, otherToken := tt.mutate(*tt.ns, tt.token)
require.Equal(tt.want, state.Changed(otherNS, otherToken))
@ -2125,7 +2126,19 @@ func TestState_WatchesAndUpdates(t *testing.T) {
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
state, err := newState(&tc.ns, "")
cn := newTestCacheNotifier()
state, err := newState(&tc.ns, "", stateConfig{
logger: testutil.Logger(t),
cache: cn,
health: &health.Client{Cache: cn, CacheName: cachetype.HealthServicesName},
source: &structs.QuerySource{
Datacenter: tc.sourceDC,
},
dnsConfig: DNSConfig{
Domain: "consul.",
AltDomain: "alt.consul.",
},
})
// verify building the initial state worked
require.NoError(t, err)
@ -2134,30 +2147,12 @@ func TestState_WatchesAndUpdates(t *testing.T) {
// setup the test logger to use the t.Log
state.logger = testutil.Logger(t)
// setup a new testing cache notifier
cn := newTestCacheNotifier()
state.cache = cn
state.health = &health.Client{Cache: cn, CacheName: cachetype.HealthServicesName}
// setup the local datacenter information
state.source = &structs.QuerySource{
Datacenter: tc.sourceDC,
}
state.dnsConfig = DNSConfig{
Domain: "consul.",
AltDomain: "alt.consul.",
}
// setup the ctx as initWatches expects this to be there
var ctx context.Context
ctx, state.cancel = context.WithCancel(context.Background())
// get the initial configuration snapshot
snap := state.initialConfigSnapshot()
// ensure the initial watch setup did not error
require.NoError(t, state.initWatches(ctx, &snap))
snap, err := state.handler.initialize(ctx)
require.NoError(t, err)
//--------------------------------------------------------------------
//
@ -2184,7 +2179,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
// therefore we just tell it about the updates
for eveIdx, event := range stage.events {
require.True(t, t.Run(fmt.Sprintf("update-%d", eveIdx), func(t *testing.T) {
require.NoError(t, state.handleUpdate(ctx, event, &snap))
require.NoError(t, state.handler.handleUpdate(ctx, event, &snap))
}))
}