diff --git a/agent/proxycfg/manager.go b/agent/proxycfg/manager.go index 4858d1f336..d0ac677f1a 100644 --- a/agent/proxycfg/manager.go +++ b/agent/proxycfg/manager.go @@ -195,13 +195,15 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, token string return err } - // Set the necessary dependencies - state.logger = m.Logger.With("service_id", sid.String()) - state.cache = m.Cache - state.health = m.Health - state.source = m.Source - state.dnsConfig = m.DNSConfig - state.intentionDefaultAllow = m.IntentionDefaultAllow + // TODO: move to a function that translates ManagerConfig->stateConfig + state.stateConfig = stateConfig{ + logger: m.Logger.With("service_id", sid.String()), + cache: m.Cache, + health: m.Health, + source: m.Source, + dnsConfig: m.DNSConfig, + intentionDefaultAllow: m.IntentionDefaultAllow, + } if m.TLSConfigurator != nil { state.serverSNIFn = m.TLSConfigurator.ServerSNI } diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index e5c94526be..abf262299a 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -49,16 +49,11 @@ const ( intentionUpstreamsID = "intention-upstreams" meshConfigEntryID = "mesh" svcChecksWatchIDPrefix = cachetype.ServiceHTTPChecksName + ":" - serviceIDPrefix = string(structs.UpstreamDestTypeService) + ":" preparedQueryIDPrefix = string(structs.UpstreamDestTypePreparedQuery) + ":" defaultPreparedQueryPollInterval = 30 * time.Second ) -// state holds all the state needed to maintain the config for a registered -// connect-proxy service. When a proxy registration is changed, the entire state -// is discarded and a new one created. -type state struct { - // logger, source and cache are required to be set before calling Watch. +type stateConfig struct { logger hclog.Logger source *structs.QuerySource cache CacheNotifier @@ -66,21 +61,21 @@ type state struct { dnsConfig DNSConfig serverSNIFn ServerSNIFunc intentionDefaultAllow bool +} + +// state holds all the state needed to maintain the config for a registered +// connect-proxy service. When a proxy registration is changed, the entire state +// is discarded and a new one created. +type state struct { + // TODO: un-embedd once refactor is complete + stateConfig + // TODO: un-embed once refactor is complete + serviceInstance // cancel is set by Watch and called by Close to stop the goroutine started // in Watch. cancel func() - kind structs.ServiceKind - service string - proxyID structs.ServiceID - address string - port int - meta map[string]string - taggedAddresses map[string]structs.ServiceAddress - proxyCfg structs.ConnectProxyConfig - token string - ch chan cache.UpdateEvent snapCh chan ConfigSnapshot reqCh chan chan *ConfigSnapshot @@ -93,6 +88,18 @@ type DNSConfig struct { type ServerSNIFunc func(dc, nodeName string) string +type serviceInstance struct { + kind structs.ServiceKind + service string + proxyID structs.ServiceID + address string + port int + meta map[string]string + taggedAddresses map[string]structs.ServiceAddress + proxyCfg structs.ConnectProxyConfig + token string +} + func copyProxyConfig(ns *structs.NodeService) (structs.ConnectProxyConfig, error) { if ns == nil { return structs.ConnectProxyConfig{}, nil @@ -139,11 +146,33 @@ func newState(ns *structs.NodeService, token string) (*state, error) { return nil, errors.New("not a connect-proxy, terminating-gateway, mesh-gateway, or ingress-gateway") } - proxyCfg, err := copyProxyConfig(ns) + s, err := newServiceInstanceFromNodeService(ns, token) if err != nil { return nil, err } + return &state{ + serviceInstance: s, + + // 10 is fairly arbitrary here but allow for the 3 mandatory and a + // reasonable number of upstream watches to all deliver their initial + // messages in parallel without blocking the cache.Notify loops. It's not a + // huge deal if we do for a short period so we don't need to be more + // conservative to handle larger numbers of upstreams correctly but gives + // some head room for normal operation to be non-blocking in most typical + // cases. + ch: make(chan cache.UpdateEvent, 10), + snapCh: make(chan ConfigSnapshot, 1), + reqCh: make(chan chan *ConfigSnapshot, 1), + }, nil +} + +func newServiceInstanceFromNodeService(ns *structs.NodeService, token string) (serviceInstance, error) { + proxyCfg, err := copyProxyConfig(ns) + if err != nil { + return serviceInstance{}, err + } + taggedAddresses := make(map[string]structs.ServiceAddress) for k, v := range ns.TaggedAddresses { taggedAddresses[k] = v @@ -154,7 +183,7 @@ func newState(ns *structs.NodeService, token string) (*state, error) { meta[k] = v } - return &state{ + return serviceInstance{ kind: ns.Kind, service: ns.Service, proxyID: ns.CompoundServiceID(), @@ -164,17 +193,6 @@ func newState(ns *structs.NodeService, token string) (*state, error) { taggedAddresses: taggedAddresses, proxyCfg: proxyCfg, token: token, - - // 10 is fairly arbitrary here but allow for the 3 mandatory and a - // reasonable number of upstream watches to all deliver their initial - // messages in parallel without blocking the cache.Notify loops. It's not a - // huge deal if we do for a short period so we don't need to be more - // conservative to handle larger numbers of upstreams correctly but gives - // some head room for normal operation to be non-blocking in most typical - // cases. - ch: make(chan cache.UpdateEvent, 10), - snapCh: make(chan ConfigSnapshot, 1), - reqCh: make(chan chan *ConfigSnapshot, 1), }, nil }