mirror of https://github.com/status-im/consul.git
Merge pull request #9466 from hashicorp/dnephin/proxycfg-state
proxycfg: prepare state for split by kind
This commit is contained in:
commit
0547d0c046
|
@ -195,13 +195,15 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, token string
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the necessary dependencies
|
// TODO: move to a function that translates ManagerConfig->stateConfig
|
||||||
state.logger = m.Logger.With("service_id", sid.String())
|
state.stateConfig = stateConfig{
|
||||||
state.cache = m.Cache
|
logger: m.Logger.With("service_id", sid.String()),
|
||||||
state.health = m.Health
|
cache: m.Cache,
|
||||||
state.source = m.Source
|
health: m.Health,
|
||||||
state.dnsConfig = m.DNSConfig
|
source: m.Source,
|
||||||
state.intentionDefaultAllow = m.IntentionDefaultAllow
|
dnsConfig: m.DNSConfig,
|
||||||
|
intentionDefaultAllow: m.IntentionDefaultAllow,
|
||||||
|
}
|
||||||
if m.TLSConfigurator != nil {
|
if m.TLSConfigurator != nil {
|
||||||
state.serverSNIFn = m.TLSConfigurator.ServerSNI
|
state.serverSNIFn = m.TLSConfigurator.ServerSNI
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,16 +49,11 @@ const (
|
||||||
intentionUpstreamsID = "intention-upstreams"
|
intentionUpstreamsID = "intention-upstreams"
|
||||||
meshConfigEntryID = "mesh"
|
meshConfigEntryID = "mesh"
|
||||||
svcChecksWatchIDPrefix = cachetype.ServiceHTTPChecksName + ":"
|
svcChecksWatchIDPrefix = cachetype.ServiceHTTPChecksName + ":"
|
||||||
serviceIDPrefix = string(structs.UpstreamDestTypeService) + ":"
|
|
||||||
preparedQueryIDPrefix = string(structs.UpstreamDestTypePreparedQuery) + ":"
|
preparedQueryIDPrefix = string(structs.UpstreamDestTypePreparedQuery) + ":"
|
||||||
defaultPreparedQueryPollInterval = 30 * time.Second
|
defaultPreparedQueryPollInterval = 30 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// state holds all the state needed to maintain the config for a registered
|
type stateConfig struct {
|
||||||
// connect-proxy service. When a proxy registration is changed, the entire state
|
|
||||||
// is discarded and a new one created.
|
|
||||||
type state struct {
|
|
||||||
// logger, source and cache are required to be set before calling Watch.
|
|
||||||
logger hclog.Logger
|
logger hclog.Logger
|
||||||
source *structs.QuerySource
|
source *structs.QuerySource
|
||||||
cache CacheNotifier
|
cache CacheNotifier
|
||||||
|
@ -66,21 +61,21 @@ type state struct {
|
||||||
dnsConfig DNSConfig
|
dnsConfig DNSConfig
|
||||||
serverSNIFn ServerSNIFunc
|
serverSNIFn ServerSNIFunc
|
||||||
intentionDefaultAllow bool
|
intentionDefaultAllow bool
|
||||||
|
}
|
||||||
|
|
||||||
// ctx and cancel store the context created during initWatches call
|
// state holds all the state needed to maintain the config for a registered
|
||||||
ctx context.Context
|
// connect-proxy service. When a proxy registration is changed, the entire state
|
||||||
|
// is discarded and a new one created.
|
||||||
|
type state struct {
|
||||||
|
// TODO: un-embedd once refactor is complete
|
||||||
|
stateConfig
|
||||||
|
// TODO: un-embed once refactor is complete
|
||||||
|
serviceInstance
|
||||||
|
|
||||||
|
// cancel is set by Watch and called by Close to stop the goroutine started
|
||||||
|
// in Watch.
|
||||||
cancel func()
|
cancel func()
|
||||||
|
|
||||||
kind structs.ServiceKind
|
|
||||||
service string
|
|
||||||
proxyID structs.ServiceID
|
|
||||||
address string
|
|
||||||
port int
|
|
||||||
meta map[string]string
|
|
||||||
taggedAddresses map[string]structs.ServiceAddress
|
|
||||||
proxyCfg structs.ConnectProxyConfig
|
|
||||||
token string
|
|
||||||
|
|
||||||
ch chan cache.UpdateEvent
|
ch chan cache.UpdateEvent
|
||||||
snapCh chan ConfigSnapshot
|
snapCh chan ConfigSnapshot
|
||||||
reqCh chan chan *ConfigSnapshot
|
reqCh chan chan *ConfigSnapshot
|
||||||
|
@ -93,6 +88,18 @@ type DNSConfig struct {
|
||||||
|
|
||||||
type ServerSNIFunc func(dc, nodeName string) string
|
type ServerSNIFunc func(dc, nodeName string) string
|
||||||
|
|
||||||
|
type serviceInstance struct {
|
||||||
|
kind structs.ServiceKind
|
||||||
|
service string
|
||||||
|
proxyID structs.ServiceID
|
||||||
|
address string
|
||||||
|
port int
|
||||||
|
meta map[string]string
|
||||||
|
taggedAddresses map[string]structs.ServiceAddress
|
||||||
|
proxyCfg structs.ConnectProxyConfig
|
||||||
|
token string
|
||||||
|
}
|
||||||
|
|
||||||
func copyProxyConfig(ns *structs.NodeService) (structs.ConnectProxyConfig, error) {
|
func copyProxyConfig(ns *structs.NodeService) (structs.ConnectProxyConfig, error) {
|
||||||
if ns == nil {
|
if ns == nil {
|
||||||
return structs.ConnectProxyConfig{}, nil
|
return structs.ConnectProxyConfig{}, nil
|
||||||
|
@ -139,31 +146,13 @@ func newState(ns *structs.NodeService, token string) (*state, error) {
|
||||||
return nil, errors.New("not a connect-proxy, terminating-gateway, mesh-gateway, or ingress-gateway")
|
return nil, errors.New("not a connect-proxy, terminating-gateway, mesh-gateway, or ingress-gateway")
|
||||||
}
|
}
|
||||||
|
|
||||||
proxyCfg, err := copyProxyConfig(ns)
|
s, err := newServiceInstanceFromNodeService(ns, token)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
taggedAddresses := make(map[string]structs.ServiceAddress)
|
|
||||||
for k, v := range ns.TaggedAddresses {
|
|
||||||
taggedAddresses[k] = v
|
|
||||||
}
|
|
||||||
|
|
||||||
meta := make(map[string]string)
|
|
||||||
for k, v := range ns.Meta {
|
|
||||||
meta[k] = v
|
|
||||||
}
|
|
||||||
|
|
||||||
return &state{
|
return &state{
|
||||||
kind: ns.Kind,
|
serviceInstance: s,
|
||||||
service: ns.Service,
|
|
||||||
proxyID: ns.CompoundServiceID(),
|
|
||||||
address: ns.Address,
|
|
||||||
port: ns.Port,
|
|
||||||
meta: meta,
|
|
||||||
taggedAddresses: taggedAddresses,
|
|
||||||
proxyCfg: proxyCfg,
|
|
||||||
token: token,
|
|
||||||
|
|
||||||
// 10 is fairly arbitrary here but allow for the 3 mandatory and a
|
// 10 is fairly arbitrary here but allow for the 3 mandatory and a
|
||||||
// reasonable number of upstream watches to all deliver their initial
|
// reasonable number of upstream watches to all deliver their initial
|
||||||
|
@ -178,21 +167,51 @@ func newState(ns *structs.NodeService, token string) (*state, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newServiceInstanceFromNodeService(ns *structs.NodeService, token string) (serviceInstance, error) {
|
||||||
|
proxyCfg, err := copyProxyConfig(ns)
|
||||||
|
if err != nil {
|
||||||
|
return serviceInstance{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
taggedAddresses := make(map[string]structs.ServiceAddress)
|
||||||
|
for k, v := range ns.TaggedAddresses {
|
||||||
|
taggedAddresses[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
meta := make(map[string]string)
|
||||||
|
for k, v := range ns.Meta {
|
||||||
|
meta[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
return serviceInstance{
|
||||||
|
kind: ns.Kind,
|
||||||
|
service: ns.Service,
|
||||||
|
proxyID: ns.CompoundServiceID(),
|
||||||
|
address: ns.Address,
|
||||||
|
port: ns.Port,
|
||||||
|
meta: meta,
|
||||||
|
taggedAddresses: taggedAddresses,
|
||||||
|
proxyCfg: proxyCfg,
|
||||||
|
token: token,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Watch initialized watches on all necessary cache data for the current proxy
|
// Watch initialized watches on all necessary cache data for the current proxy
|
||||||
// registration state and returns a chan to observe updates to the
|
// registration state and returns a chan to observe updates to the
|
||||||
// ConfigSnapshot that contains all necessary config state. The chan is closed
|
// ConfigSnapshot that contains all necessary config state. The chan is closed
|
||||||
// when the state is Closed.
|
// when the state is Closed.
|
||||||
func (s *state) Watch() (<-chan ConfigSnapshot, error) {
|
func (s *state) Watch() (<-chan ConfigSnapshot, error) {
|
||||||
s.ctx, s.cancel = context.WithCancel(context.Background())
|
var ctx context.Context
|
||||||
|
ctx, s.cancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
snap := s.initialConfigSnapshot()
|
snap := s.initialConfigSnapshot()
|
||||||
err := s.initWatches(&snap)
|
err := s.initWatches(ctx, &snap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.cancel()
|
s.cancel()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
go s.run(&snap)
|
go s.run(ctx, &snap)
|
||||||
|
|
||||||
return s.snapCh, nil
|
return s.snapCh, nil
|
||||||
}
|
}
|
||||||
|
@ -206,16 +225,16 @@ func (s *state) Close() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// initWatches sets up the watches needed for the particular service
|
// initWatches sets up the watches needed for the particular service
|
||||||
func (s *state) initWatches(snap *ConfigSnapshot) error {
|
func (s *state) initWatches(ctx context.Context, snap *ConfigSnapshot) error {
|
||||||
switch s.kind {
|
switch s.kind {
|
||||||
case structs.ServiceKindConnectProxy:
|
case structs.ServiceKindConnectProxy:
|
||||||
return s.initWatchesConnectProxy(snap)
|
return s.initWatchesConnectProxy(ctx, snap)
|
||||||
case structs.ServiceKindTerminatingGateway:
|
case structs.ServiceKindTerminatingGateway:
|
||||||
return s.initWatchesTerminatingGateway()
|
return s.initWatchesTerminatingGateway(ctx)
|
||||||
case structs.ServiceKindMeshGateway:
|
case structs.ServiceKindMeshGateway:
|
||||||
return s.initWatchesMeshGateway()
|
return s.initWatchesMeshGateway(ctx)
|
||||||
case structs.ServiceKindIngressGateway:
|
case structs.ServiceKindIngressGateway:
|
||||||
return s.initWatchesIngressGateway()
|
return s.initWatchesIngressGateway(ctx)
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("Unsupported service kind")
|
return fmt.Errorf("Unsupported service kind")
|
||||||
}
|
}
|
||||||
|
@ -234,9 +253,9 @@ func (s *state) watchMeshGateway(ctx context.Context, dc string, upstreamID stri
|
||||||
|
|
||||||
// initWatchesConnectProxy sets up the watches needed based on current proxy registration
|
// initWatchesConnectProxy sets up the watches needed based on current proxy registration
|
||||||
// state.
|
// state.
|
||||||
func (s *state) initWatchesConnectProxy(snap *ConfigSnapshot) error {
|
func (s *state) initWatchesConnectProxy(ctx context.Context, snap *ConfigSnapshot) error {
|
||||||
// Watch for root changes
|
// Watch for root changes
|
||||||
err := s.cache.Notify(s.ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
|
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
Source: *s.source,
|
Source: *s.source,
|
||||||
|
@ -246,7 +265,7 @@ func (s *state) initWatchesConnectProxy(snap *ConfigSnapshot) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Watch the leaf cert
|
// Watch the leaf cert
|
||||||
err = s.cache.Notify(s.ctx, cachetype.ConnectCALeafName, &cachetype.ConnectCALeafRequest{
|
err = s.cache.Notify(ctx, cachetype.ConnectCALeafName, &cachetype.ConnectCALeafRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
Token: s.token,
|
Token: s.token,
|
||||||
Service: s.proxyCfg.DestinationServiceName,
|
Service: s.proxyCfg.DestinationServiceName,
|
||||||
|
@ -257,7 +276,7 @@ func (s *state) initWatchesConnectProxy(snap *ConfigSnapshot) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Watch for intention updates
|
// Watch for intention updates
|
||||||
err = s.cache.Notify(s.ctx, cachetype.IntentionMatchName, &structs.IntentionQueryRequest{
|
err = s.cache.Notify(ctx, cachetype.IntentionMatchName, &structs.IntentionQueryRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
Match: &structs.IntentionQueryMatch{
|
Match: &structs.IntentionQueryMatch{
|
||||||
|
@ -275,7 +294,7 @@ func (s *state) initWatchesConnectProxy(snap *ConfigSnapshot) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Watch for service check updates
|
// Watch for service check updates
|
||||||
err = s.cache.Notify(s.ctx, cachetype.ServiceHTTPChecksName, &cachetype.ServiceHTTPChecksRequest{
|
err = s.cache.Notify(ctx, cachetype.ServiceHTTPChecksName, &cachetype.ServiceHTTPChecksRequest{
|
||||||
ServiceID: s.proxyCfg.DestinationServiceID,
|
ServiceID: s.proxyCfg.DestinationServiceID,
|
||||||
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
||||||
}, svcChecksWatchIDPrefix+structs.ServiceIDString(s.proxyCfg.DestinationServiceID, &s.proxyID.EnterpriseMeta), s.ch)
|
}, svcChecksWatchIDPrefix+structs.ServiceIDString(s.proxyCfg.DestinationServiceID, &s.proxyID.EnterpriseMeta), s.ch)
|
||||||
|
@ -288,7 +307,7 @@ func (s *state) initWatchesConnectProxy(snap *ConfigSnapshot) error {
|
||||||
|
|
||||||
if s.proxyCfg.Mode == structs.ProxyModeTransparent {
|
if s.proxyCfg.Mode == structs.ProxyModeTransparent {
|
||||||
// When in transparent proxy we will infer upstreams from intentions with this source
|
// When in transparent proxy we will infer upstreams from intentions with this source
|
||||||
err := s.cache.Notify(s.ctx, cachetype.IntentionUpstreamsName, &structs.ServiceSpecificRequest{
|
err := s.cache.Notify(ctx, cachetype.IntentionUpstreamsName, &structs.ServiceSpecificRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
ServiceName: s.proxyCfg.DestinationServiceName,
|
ServiceName: s.proxyCfg.DestinationServiceName,
|
||||||
|
@ -298,7 +317,7 @@ func (s *state) initWatchesConnectProxy(snap *ConfigSnapshot) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.cache.Notify(s.ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{
|
err = s.cache.Notify(ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{
|
||||||
Kind: structs.MeshConfig,
|
Kind: structs.MeshConfig,
|
||||||
Name: structs.MeshConfigMesh,
|
Name: structs.MeshConfigMesh,
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
|
@ -354,7 +373,7 @@ func (s *state) initWatchesConnectProxy(snap *ConfigSnapshot) error {
|
||||||
|
|
||||||
switch u.DestinationType {
|
switch u.DestinationType {
|
||||||
case structs.UpstreamDestTypePreparedQuery:
|
case structs.UpstreamDestTypePreparedQuery:
|
||||||
err = s.cache.Notify(s.ctx, cachetype.PreparedQueryName, &structs.PreparedQueryExecuteRequest{
|
err = s.cache.Notify(ctx, cachetype.PreparedQueryName, &structs.PreparedQueryExecuteRequest{
|
||||||
Datacenter: dc,
|
Datacenter: dc,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: defaultPreparedQueryPollInterval},
|
QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: defaultPreparedQueryPollInterval},
|
||||||
QueryIDOrName: u.DestinationName,
|
QueryIDOrName: u.DestinationName,
|
||||||
|
@ -369,7 +388,7 @@ func (s *state) initWatchesConnectProxy(snap *ConfigSnapshot) error {
|
||||||
fallthrough
|
fallthrough
|
||||||
|
|
||||||
case "": // Treat unset as the default Service type
|
case "": // Treat unset as the default Service type
|
||||||
err = s.cache.Notify(s.ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
|
err = s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
Name: u.DestinationName,
|
Name: u.DestinationName,
|
||||||
|
@ -411,9 +430,9 @@ func parseReducedUpstreamConfig(m map[string]interface{}) (reducedUpstreamConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
// initWatchesTerminatingGateway sets up the initial watches needed based on the terminating-gateway registration
|
// initWatchesTerminatingGateway sets up the initial watches needed based on the terminating-gateway registration
|
||||||
func (s *state) initWatchesTerminatingGateway() error {
|
func (s *state) initWatchesTerminatingGateway(ctx context.Context) error {
|
||||||
// Watch for root changes
|
// Watch for root changes
|
||||||
err := s.cache.Notify(s.ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
|
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
Source: *s.source,
|
Source: *s.source,
|
||||||
|
@ -425,7 +444,7 @@ func (s *state) initWatchesTerminatingGateway() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Watch for the terminating-gateway's linked services
|
// Watch for the terminating-gateway's linked services
|
||||||
err = s.cache.Notify(s.ctx, cachetype.GatewayServicesName, &structs.ServiceSpecificRequest{
|
err = s.cache.Notify(ctx, cachetype.GatewayServicesName, &structs.ServiceSpecificRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
ServiceName: s.service,
|
ServiceName: s.service,
|
||||||
|
@ -441,9 +460,9 @@ func (s *state) initWatchesTerminatingGateway() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// initWatchesMeshGateway sets up the watches needed based on the current mesh gateway registration
|
// initWatchesMeshGateway sets up the watches needed based on the current mesh gateway registration
|
||||||
func (s *state) initWatchesMeshGateway() error {
|
func (s *state) initWatchesMeshGateway(ctx context.Context) error {
|
||||||
// Watch for root changes
|
// Watch for root changes
|
||||||
err := s.cache.Notify(s.ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
|
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
Source: *s.source,
|
Source: *s.source,
|
||||||
|
@ -453,7 +472,7 @@ func (s *state) initWatchesMeshGateway() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Watch for all services
|
// Watch for all services
|
||||||
err = s.cache.Notify(s.ctx, cachetype.CatalogServiceListName, &structs.DCSpecificRequest{
|
err = s.cache.Notify(ctx, cachetype.CatalogServiceListName, &structs.DCSpecificRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
Source: *s.source,
|
Source: *s.source,
|
||||||
|
@ -468,7 +487,7 @@ func (s *state) initWatchesMeshGateway() error {
|
||||||
// Conveniently we can just use this service meta attribute in one
|
// Conveniently we can just use this service meta attribute in one
|
||||||
// place here to set the machinery in motion and leave the conditional
|
// place here to set the machinery in motion and leave the conditional
|
||||||
// behavior out of the rest of the package.
|
// behavior out of the rest of the package.
|
||||||
err = s.cache.Notify(s.ctx, cachetype.FederationStateListMeshGatewaysName, &structs.DCSpecificRequest{
|
err = s.cache.Notify(ctx, cachetype.FederationStateListMeshGatewaysName, &structs.DCSpecificRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
Source: *s.source,
|
Source: *s.source,
|
||||||
|
@ -477,7 +496,7 @@ func (s *state) initWatchesMeshGateway() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.health.Notify(s.ctx, structs.ServiceSpecificRequest{
|
err = s.health.Notify(ctx, structs.ServiceSpecificRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
ServiceName: structs.ConsulServiceName,
|
ServiceName: structs.ConsulServiceName,
|
||||||
|
@ -492,7 +511,7 @@ func (s *state) initWatchesMeshGateway() error {
|
||||||
// cannot setup those watches until we know what the services are. from the service list
|
// cannot setup those watches until we know what the services are. from the service list
|
||||||
// watch above
|
// watch above
|
||||||
|
|
||||||
err = s.cache.Notify(s.ctx, cachetype.CatalogDatacentersName, &structs.DatacentersRequest{
|
err = s.cache.Notify(ctx, cachetype.CatalogDatacentersName, &structs.DatacentersRequest{
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: 30 * time.Second},
|
QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: 30 * time.Second},
|
||||||
}, datacentersWatchID, s.ch)
|
}, datacentersWatchID, s.ch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -504,7 +523,7 @@ func (s *state) initWatchesMeshGateway() error {
|
||||||
// know what they are yet.
|
// know what they are yet.
|
||||||
|
|
||||||
// Watch service-resolvers so we can setup service subset clusters
|
// Watch service-resolvers so we can setup service subset clusters
|
||||||
err = s.cache.Notify(s.ctx, cachetype.ConfigEntriesName, &structs.ConfigEntryQuery{
|
err = s.cache.Notify(ctx, cachetype.ConfigEntriesName, &structs.ConfigEntryQuery{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
Kind: structs.ServiceResolver,
|
Kind: structs.ServiceResolver,
|
||||||
|
@ -520,9 +539,9 @@ func (s *state) initWatchesMeshGateway() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *state) initWatchesIngressGateway() error {
|
func (s *state) initWatchesIngressGateway(ctx context.Context) error {
|
||||||
// Watch for root changes
|
// Watch for root changes
|
||||||
err := s.cache.Notify(s.ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
|
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
Source: *s.source,
|
Source: *s.source,
|
||||||
|
@ -532,7 +551,7 @@ func (s *state) initWatchesIngressGateway() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Watch this ingress gateway's config entry
|
// Watch this ingress gateway's config entry
|
||||||
err = s.cache.Notify(s.ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{
|
err = s.cache.Notify(ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{
|
||||||
Kind: structs.IngressGateway,
|
Kind: structs.IngressGateway,
|
||||||
Name: s.service,
|
Name: s.service,
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
|
@ -544,7 +563,7 @@ func (s *state) initWatchesIngressGateway() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Watch the ingress-gateway's list of upstreams
|
// Watch the ingress-gateway's list of upstreams
|
||||||
err = s.cache.Notify(s.ctx, cachetype.GatewayServicesName, &structs.ServiceSpecificRequest{
|
err = s.cache.Notify(ctx, cachetype.GatewayServicesName, &structs.ServiceSpecificRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
ServiceName: s.service,
|
ServiceName: s.service,
|
||||||
|
@ -619,7 +638,7 @@ func (s *state) initialConfigSnapshot() ConfigSnapshot {
|
||||||
return snap
|
return snap
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *state) run(snap *ConfigSnapshot) {
|
func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
|
||||||
// Close the channel we return from Watch when we stop so consumers can stop
|
// Close the channel we return from Watch when we stop so consumers can stop
|
||||||
// watching and clean up their goroutines. It's important we do this here and
|
// watching and clean up their goroutines. It's important we do this here and
|
||||||
// not in Close since this routine sends on this chan and so might panic if it
|
// not in Close since this routine sends on this chan and so might panic if it
|
||||||
|
@ -635,12 +654,12 @@ func (s *state) run(snap *ConfigSnapshot) {
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-s.ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case u := <-s.ch:
|
case u := <-s.ch:
|
||||||
s.logger.Trace("A blocking query returned; handling snapshot update")
|
s.logger.Trace("A blocking query returned; handling snapshot update")
|
||||||
|
|
||||||
if err := s.handleUpdate(u, snap); err != nil {
|
if err := s.handleUpdate(ctx, u, snap); err != nil {
|
||||||
s.logger.Error("Failed to handle update from watch",
|
s.logger.Error("Failed to handle update from watch",
|
||||||
"id", u.CorrelationID, "error", err,
|
"id", u.CorrelationID, "error", err,
|
||||||
)
|
)
|
||||||
|
@ -729,22 +748,22 @@ func (s *state) run(snap *ConfigSnapshot) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *state) handleUpdate(u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
func (s *state) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
||||||
switch s.kind {
|
switch s.kind {
|
||||||
case structs.ServiceKindConnectProxy:
|
case structs.ServiceKindConnectProxy:
|
||||||
return s.handleUpdateConnectProxy(u, snap)
|
return s.handleUpdateConnectProxy(ctx, u, snap)
|
||||||
case structs.ServiceKindTerminatingGateway:
|
case structs.ServiceKindTerminatingGateway:
|
||||||
return s.handleUpdateTerminatingGateway(u, snap)
|
return s.handleUpdateTerminatingGateway(ctx, u, snap)
|
||||||
case structs.ServiceKindMeshGateway:
|
case structs.ServiceKindMeshGateway:
|
||||||
return s.handleUpdateMeshGateway(u, snap)
|
return s.handleUpdateMeshGateway(ctx, u, snap)
|
||||||
case structs.ServiceKindIngressGateway:
|
case structs.ServiceKindIngressGateway:
|
||||||
return s.handleUpdateIngressGateway(u, snap)
|
return s.handleUpdateIngressGateway(ctx, u, snap)
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("Unsupported service kind")
|
return fmt.Errorf("Unsupported service kind")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
func (s *state) handleUpdateConnectProxy(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
||||||
if u.Err != nil {
|
if u.Err != nil {
|
||||||
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
||||||
}
|
}
|
||||||
|
@ -819,7 +838,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
meshGateway: meshGateway,
|
meshGateway: meshGateway,
|
||||||
}
|
}
|
||||||
err = s.watchDiscoveryChain(snap, watchOpts)
|
err = s.watchDiscoveryChain(ctx, snap, watchOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to watch discovery chain for %s: %v", svc.String(), err)
|
return fmt.Errorf("failed to watch discovery chain for %s: %v", svc.String(), err)
|
||||||
}
|
}
|
||||||
|
@ -908,12 +927,12 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
|
||||||
snap.ConnectProxy.MeshConfigSet = true
|
snap.ConnectProxy.MeshConfigSet = true
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return s.handleUpdateUpstreams(u, snap)
|
return s.handleUpdateUpstreams(ctx, u, snap)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *state) handleUpdateUpstreams(u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
func (s *state) handleUpdateUpstreams(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
||||||
if u.Err != nil {
|
if u.Err != nil {
|
||||||
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
||||||
}
|
}
|
||||||
|
@ -939,7 +958,7 @@ func (s *state) handleUpdateUpstreams(u cache.UpdateEvent, snap *ConfigSnapshot)
|
||||||
svc := strings.TrimPrefix(u.CorrelationID, "discovery-chain:")
|
svc := strings.TrimPrefix(u.CorrelationID, "discovery-chain:")
|
||||||
upstreamsSnapshot.DiscoveryChain[svc] = resp.Chain
|
upstreamsSnapshot.DiscoveryChain[svc] = resp.Chain
|
||||||
|
|
||||||
if err := s.resetWatchesFromChain(svc, resp.Chain, upstreamsSnapshot); err != nil {
|
if err := s.resetWatchesFromChain(ctx, svc, resp.Chain, upstreamsSnapshot); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1031,6 +1050,7 @@ func removeColonPrefix(s string) (string, string, bool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *state) resetWatchesFromChain(
|
func (s *state) resetWatchesFromChain(
|
||||||
|
ctx context.Context,
|
||||||
id string,
|
id string,
|
||||||
chain *structs.CompiledDiscoveryChain,
|
chain *structs.CompiledDiscoveryChain,
|
||||||
snap *ConfigSnapshotUpstreams,
|
snap *ConfigSnapshotUpstreams,
|
||||||
|
@ -1089,7 +1109,7 @@ func (s *state) resetWatchesFromChain(
|
||||||
datacenter: target.Datacenter,
|
datacenter: target.Datacenter,
|
||||||
entMeta: target.GetEnterpriseMetadata(),
|
entMeta: target.GetEnterpriseMetadata(),
|
||||||
}
|
}
|
||||||
err := s.watchUpstreamTarget(snap, opts)
|
err := s.watchUpstreamTarget(ctx, snap, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to watch target %q for upstream %q", target.ID, id)
|
return fmt.Errorf("failed to watch target %q for upstream %q", target.ID, id)
|
||||||
}
|
}
|
||||||
|
@ -1123,7 +1143,7 @@ func (s *state) resetWatchesFromChain(
|
||||||
datacenter: chain.Datacenter,
|
datacenter: chain.Datacenter,
|
||||||
entMeta: &chainEntMeta,
|
entMeta: &chainEntMeta,
|
||||||
}
|
}
|
||||||
err := s.watchUpstreamTarget(snap, opts)
|
err := s.watchUpstreamTarget(ctx, snap, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to watch target %q for upstream %q", chainID, id)
|
return fmt.Errorf("failed to watch target %q for upstream %q", chainID, id)
|
||||||
}
|
}
|
||||||
|
@ -1140,7 +1160,7 @@ func (s *state) resetWatchesFromChain(
|
||||||
"datacenter", dc,
|
"datacenter", dc,
|
||||||
)
|
)
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(s.ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
err := s.watchMeshGateway(ctx, dc, id)
|
err := s.watchMeshGateway(ctx, dc, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel()
|
cancel()
|
||||||
|
@ -1176,7 +1196,7 @@ type targetWatchOpts struct {
|
||||||
entMeta *structs.EnterpriseMeta
|
entMeta *structs.EnterpriseMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *state) watchUpstreamTarget(snap *ConfigSnapshotUpstreams, opts targetWatchOpts) error {
|
func (s *state) watchUpstreamTarget(ctx context.Context, snap *ConfigSnapshotUpstreams, opts targetWatchOpts) error {
|
||||||
s.logger.Trace("initializing watch of target",
|
s.logger.Trace("initializing watch of target",
|
||||||
"upstream", opts.upstreamID,
|
"upstream", opts.upstreamID,
|
||||||
"chain", opts.service,
|
"chain", opts.service,
|
||||||
|
@ -1188,7 +1208,7 @@ func (s *state) watchUpstreamTarget(snap *ConfigSnapshotUpstreams, opts targetWa
|
||||||
|
|
||||||
correlationID := "upstream-target:" + opts.chainID + ":" + opts.upstreamID
|
correlationID := "upstream-target:" + opts.chainID + ":" + opts.upstreamID
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(s.ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{
|
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{
|
||||||
Datacenter: opts.datacenter,
|
Datacenter: opts.datacenter,
|
||||||
QueryOptions: structs.QueryOptions{
|
QueryOptions: structs.QueryOptions{
|
||||||
|
@ -1213,7 +1233,7 @@ func (s *state) watchUpstreamTarget(snap *ConfigSnapshotUpstreams, opts targetWa
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *state) handleUpdateTerminatingGateway(u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
func (s *state) handleUpdateTerminatingGateway(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
||||||
if u.Err != nil {
|
if u.Err != nil {
|
||||||
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
||||||
}
|
}
|
||||||
|
@ -1244,7 +1264,7 @@ func (s *state) handleUpdateTerminatingGateway(u cache.UpdateEvent, snap *Config
|
||||||
|
|
||||||
// Watch the health endpoint to discover endpoints for the service
|
// Watch the health endpoint to discover endpoints for the service
|
||||||
if _, ok := snap.TerminatingGateway.WatchedServices[svc.Service]; !ok {
|
if _, ok := snap.TerminatingGateway.WatchedServices[svc.Service]; !ok {
|
||||||
ctx, cancel := context.WithCancel(s.ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{
|
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
|
@ -1269,7 +1289,7 @@ func (s *state) handleUpdateTerminatingGateway(u cache.UpdateEvent, snap *Config
|
||||||
// Watch intentions with this service as their destination
|
// Watch intentions with this service as their destination
|
||||||
// The gateway will enforce intentions for connections to the service
|
// The gateway will enforce intentions for connections to the service
|
||||||
if _, ok := snap.TerminatingGateway.WatchedIntentions[svc.Service]; !ok {
|
if _, ok := snap.TerminatingGateway.WatchedIntentions[svc.Service]; !ok {
|
||||||
ctx, cancel := context.WithCancel(s.ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
err := s.cache.Notify(ctx, cachetype.IntentionMatchName, &structs.IntentionQueryRequest{
|
err := s.cache.Notify(ctx, cachetype.IntentionMatchName, &structs.IntentionQueryRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
|
@ -1298,7 +1318,7 @@ func (s *state) handleUpdateTerminatingGateway(u cache.UpdateEvent, snap *Config
|
||||||
// Watch leaf certificate for the service
|
// Watch leaf certificate for the service
|
||||||
// This cert is used to terminate mTLS connections on the service's behalf
|
// This cert is used to terminate mTLS connections on the service's behalf
|
||||||
if _, ok := snap.TerminatingGateway.WatchedLeaves[svc.Service]; !ok {
|
if _, ok := snap.TerminatingGateway.WatchedLeaves[svc.Service]; !ok {
|
||||||
ctx, cancel := context.WithCancel(s.ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
err := s.cache.Notify(ctx, cachetype.ConnectCALeafName, &cachetype.ConnectCALeafRequest{
|
err := s.cache.Notify(ctx, cachetype.ConnectCALeafName, &cachetype.ConnectCALeafRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
Token: s.token,
|
Token: s.token,
|
||||||
|
@ -1320,7 +1340,7 @@ func (s *state) handleUpdateTerminatingGateway(u cache.UpdateEvent, snap *Config
|
||||||
// Watch service configs for the service.
|
// Watch service configs for the service.
|
||||||
// These are used to determine the protocol for the target service.
|
// These are used to determine the protocol for the target service.
|
||||||
if _, ok := snap.TerminatingGateway.WatchedConfigs[svc.Service]; !ok {
|
if _, ok := snap.TerminatingGateway.WatchedConfigs[svc.Service]; !ok {
|
||||||
ctx, cancel := context.WithCancel(s.ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
err := s.cache.Notify(ctx, cachetype.ResolvedServiceConfigName, &structs.ServiceConfigRequest{
|
err := s.cache.Notify(ctx, cachetype.ResolvedServiceConfigName, &structs.ServiceConfigRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
|
@ -1342,7 +1362,7 @@ func (s *state) handleUpdateTerminatingGateway(u cache.UpdateEvent, snap *Config
|
||||||
// Watch service resolvers for the service
|
// Watch service resolvers for the service
|
||||||
// These are used to create clusters and endpoints for the service subsets
|
// These are used to create clusters and endpoints for the service subsets
|
||||||
if _, ok := snap.TerminatingGateway.WatchedResolvers[svc.Service]; !ok {
|
if _, ok := snap.TerminatingGateway.WatchedResolvers[svc.Service]; !ok {
|
||||||
ctx, cancel := context.WithCancel(s.ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
err := s.cache.Notify(ctx, cachetype.ConfigEntriesName, &structs.ConfigEntryQuery{
|
err := s.cache.Notify(ctx, cachetype.ConfigEntriesName, &structs.ConfigEntryQuery{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
|
@ -1498,7 +1518,7 @@ func (s *state) handleUpdateTerminatingGateway(u cache.UpdateEvent, snap *Config
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
func (s *state) handleUpdateMeshGateway(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
||||||
if u.Err != nil {
|
if u.Err != nil {
|
||||||
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
||||||
}
|
}
|
||||||
|
@ -1542,7 +1562,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho
|
||||||
svcMap[svc] = struct{}{}
|
svcMap[svc] = struct{}{}
|
||||||
|
|
||||||
if _, ok := snap.MeshGateway.WatchedServices[svc]; !ok {
|
if _, ok := snap.MeshGateway.WatchedServices[svc]; !ok {
|
||||||
ctx, cancel := context.WithCancel(s.ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{
|
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
|
@ -1591,7 +1611,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, ok := snap.MeshGateway.WatchedDatacenters[dc]; !ok {
|
if _, ok := snap.MeshGateway.WatchedDatacenters[dc]; !ok {
|
||||||
ctx, cancel := context.WithCancel(s.ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
err := s.cache.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
|
err := s.cache.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
|
||||||
Datacenter: dc,
|
Datacenter: dc,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
|
@ -1699,7 +1719,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *state) handleUpdateIngressGateway(u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
func (s *state) handleUpdateIngressGateway(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
||||||
if u.Err != nil {
|
if u.Err != nil {
|
||||||
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
||||||
}
|
}
|
||||||
|
@ -1724,7 +1744,7 @@ func (s *state) handleUpdateIngressGateway(u cache.UpdateEvent, snap *ConfigSnap
|
||||||
snap.IngressGateway.TLSEnabled = gatewayConf.TLS.Enabled
|
snap.IngressGateway.TLSEnabled = gatewayConf.TLS.Enabled
|
||||||
snap.IngressGateway.TLSSet = true
|
snap.IngressGateway.TLSSet = true
|
||||||
|
|
||||||
if err := s.watchIngressLeafCert(snap); err != nil {
|
if err := s.watchIngressLeafCert(ctx, snap); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1747,7 +1767,7 @@ func (s *state) handleUpdateIngressGateway(u cache.UpdateEvent, snap *ConfigSnap
|
||||||
namespace: u.DestinationNamespace,
|
namespace: u.DestinationNamespace,
|
||||||
datacenter: s.source.Datacenter,
|
datacenter: s.source.Datacenter,
|
||||||
}
|
}
|
||||||
err := s.watchDiscoveryChain(snap, watchOpts)
|
err := s.watchDiscoveryChain(ctx, snap, watchOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err)
|
return fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err)
|
||||||
}
|
}
|
||||||
|
@ -1770,12 +1790,12 @@ func (s *state) handleUpdateIngressGateway(u cache.UpdateEvent, snap *ConfigSnap
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.watchIngressLeafCert(snap); err != nil {
|
if err := s.watchIngressLeafCert(ctx, snap); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return s.handleUpdateUpstreams(u, snap)
|
return s.handleUpdateUpstreams(ctx, u, snap)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -1808,12 +1828,12 @@ type discoveryChainWatchOpts struct {
|
||||||
meshGateway structs.MeshGatewayConfig
|
meshGateway structs.MeshGatewayConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *state) watchDiscoveryChain(snap *ConfigSnapshot, opts discoveryChainWatchOpts) error {
|
func (s *state) watchDiscoveryChain(ctx context.Context, snap *ConfigSnapshot, opts discoveryChainWatchOpts) error {
|
||||||
if _, ok := snap.ConnectProxy.WatchedDiscoveryChains[opts.id]; ok {
|
if _, ok := snap.ConnectProxy.WatchedDiscoveryChains[opts.id]; ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(s.ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
err := s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
|
err := s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
|
@ -1879,7 +1899,7 @@ func (s *state) generateIngressDNSSANs(snap *ConfigSnapshot) []string {
|
||||||
return dnsNames
|
return dnsNames
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *state) watchIngressLeafCert(snap *ConfigSnapshot) error {
|
func (s *state) watchIngressLeafCert(ctx context.Context, snap *ConfigSnapshot) error {
|
||||||
if !snap.IngressGateway.TLSSet || !snap.IngressGateway.HostsSet {
|
if !snap.IngressGateway.TLSSet || !snap.IngressGateway.HostsSet {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -1888,7 +1908,7 @@ func (s *state) watchIngressLeafCert(snap *ConfigSnapshot) error {
|
||||||
if snap.IngressGateway.LeafCertWatchCancel != nil {
|
if snap.IngressGateway.LeafCertWatchCancel != nil {
|
||||||
snap.IngressGateway.LeafCertWatchCancel()
|
snap.IngressGateway.LeafCertWatchCancel()
|
||||||
}
|
}
|
||||||
ctx, cancel := context.WithCancel(s.ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
err := s.cache.Notify(ctx, cachetype.ConnectCALeafName, &cachetype.ConnectCALeafRequest{
|
err := s.cache.Notify(ctx, cachetype.ConnectCALeafName, &cachetype.ConnectCALeafRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
Token: s.token,
|
Token: s.token,
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/cache"
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||||
|
"github.com/hashicorp/consul/agent/connect"
|
||||||
"github.com/hashicorp/consul/agent/consul/discoverychain"
|
"github.com/hashicorp/consul/agent/consul/discoverychain"
|
||||||
"github.com/hashicorp/consul/agent/rpcclient/health"
|
"github.com/hashicorp/consul/agent/rpcclient/health"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
@ -2151,13 +2152,14 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// setup the ctx as initWatches expects this to be there
|
// setup the ctx as initWatches expects this to be there
|
||||||
state.ctx, state.cancel = context.WithCancel(context.Background())
|
var ctx context.Context
|
||||||
|
ctx, state.cancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
// get the initial configuration snapshot
|
// get the initial configuration snapshot
|
||||||
snap := state.initialConfigSnapshot()
|
snap := state.initialConfigSnapshot()
|
||||||
|
|
||||||
// ensure the initial watch setup did not error
|
// ensure the initial watch setup did not error
|
||||||
require.NoError(t, state.initWatches(&snap))
|
require.NoError(t, state.initWatches(ctx, &snap))
|
||||||
|
|
||||||
//--------------------------------------------------------------------
|
//--------------------------------------------------------------------
|
||||||
//
|
//
|
||||||
|
@ -2184,7 +2186,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
||||||
// therefore we just tell it about the updates
|
// therefore we just tell it about the updates
|
||||||
for eveIdx, event := range stage.events {
|
for eveIdx, event := range stage.events {
|
||||||
require.True(t, t.Run(fmt.Sprintf("update-%d", eveIdx), func(t *testing.T) {
|
require.True(t, t.Run(fmt.Sprintf("update-%d", eveIdx), func(t *testing.T) {
|
||||||
require.NoError(t, state.handleUpdate(event, &snap))
|
require.NoError(t, state.handleUpdate(ctx, event, &snap))
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue