diff --git a/agent/service_manager.go b/agent/service_manager.go index 915cf9dfd8..16d15255be 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -27,10 +27,11 @@ type ServiceManager struct { // services tracks all active watches for registered services services map[structs.ServiceID]*serviceConfigWatch - // registerCh is a channel for processing service registrations in the - // background when watches are notified of changes. All sends and receives - // must also obey the ctx.Done() channel to avoid a deadlock during - // shutdown. + // registerCh is a channel for receiving service registration requests from + // from serviceConfigWatchers. + // The registrations are handled in the background when watches are notified of + // changes. All sends and receives must also obey the ctx.Done() channel to + // avoid a deadlock during shutdown. registerCh chan *asyncRegisterRequest // ctx is the shared context for all goroutines launched @@ -220,7 +221,6 @@ type serviceRegistration struct { // service/proxy defaults. type serviceConfigWatch struct { registration *serviceRegistration - defaults *structs.ServiceConfigResponse agent *Agent registerCh chan<- *asyncRegisterRequest @@ -239,7 +239,7 @@ type serviceConfigWatch struct { // NOTE: this is called while holding the Agent.stateLock func (w *serviceConfigWatch) RegisterAndStart( ctx context.Context, - previousDefaults *structs.ServiceConfigResponse, + serviceDefaults *structs.ServiceConfigResponse, waitForCentralConfig bool, persistServiceConfig bool, wg *sync.WaitGroup, @@ -251,16 +251,16 @@ func (w *serviceConfigWatch) RegisterAndStart( // operation. Either way the watcher will end up with something flagged // as defaults even if they don't actually reflect actual defaults. if waitForCentralConfig { - if err := w.fetchDefaults(ctx); err != nil { + var err error + serviceDefaults, err = w.fetchDefaults(ctx) + if err != nil { return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", service.ID, err) } - } else { - w.defaults = previousDefaults } // Merge the local registration with the central defaults and update this service // in the local state. - merged, err := w.mergeServiceConfig() + merged, err := mergeServiceConfig(serviceDefaults, w.registration.service) if err != nil { return err } @@ -272,7 +272,7 @@ func (w *serviceConfigWatch) RegisterAndStart( service: merged, chkTypes: w.registration.chkTypes, persistService: w.registration.service, - persistDefaults: w.defaults, + persistDefaults: serviceDefaults, persist: w.registration.persist, persistServiceConfig: persistServiceConfig, token: w.registration.token, @@ -289,22 +289,20 @@ func (w *serviceConfigWatch) RegisterAndStart( } // NOTE: this is called while holding the Agent.stateLock -func (w *serviceConfigWatch) fetchDefaults(ctx context.Context) error { +func (w *serviceConfigWatch) fetchDefaults(ctx context.Context) (*structs.ServiceConfigResponse, error) { req := makeConfigRequest(w.agent, w.registration) raw, _, err := w.agent.cache.Get(ctx, cachetype.ResolvedServiceConfigName, req) if err != nil { - return err + return nil, err } - reply, ok := raw.(*structs.ServiceConfigResponse) + serviceConfig, ok := raw.(*structs.ServiceConfigResponse) if !ok { // This should never happen, but we want to protect against panics - return fmt.Errorf("internal error: response type not correct") + return nil, fmt.Errorf("internal error: response type not correct") } - - w.defaults = reply - return nil + return serviceConfig, nil } // Start starts the config watch and a goroutine to handle updates over the @@ -381,7 +379,7 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat return fmt.Errorf("error watching service config: %v", event.Err) } - res, ok := event.Result.(*structs.ServiceConfigResponse) + serviceDefaults, ok := event.Result.(*structs.ServiceConfigResponse) if !ok { return fmt.Errorf("unknown update event type: %T", event) } @@ -393,11 +391,10 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat // delivered) the correct config so just ignore this old message. return nil } - w.defaults = res // Merge the local registration with the central defaults and update this service // in the local state. - merged, err := w.mergeServiceConfig() + merged, err := mergeServiceConfig(serviceDefaults, w.registration.service) if err != nil { return err } @@ -415,7 +412,7 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat service: merged, chkTypes: w.registration.chkTypes, persistService: w.registration.service, - persistDefaults: w.defaults, + persistDefaults: serviceDefaults, persist: w.registration.persist, persistServiceConfig: true, token: w.registration.token, @@ -484,19 +481,17 @@ func makeConfigRequest(agent *Agent, registration *serviceRegistration) *structs return req } -// mergeServiceConfig returns the final effective config for the watched service, -// including the latest known global defaults from the servers. -// -// NOTE: this is called while holding the Agent.stateLock -func (w *serviceConfigWatch) mergeServiceConfig() (*structs.NodeService, error) { - if w.defaults == nil { - return w.registration.service, nil +// mergeServiceConfig from service into defaults to produce the final effective +// config for the watched service. +func mergeServiceConfig(defaults *structs.ServiceConfigResponse, service *structs.NodeService) (*structs.NodeService, error) { + if defaults == nil { + return service, nil } // We don't want to change s.registration in place since it is our source of // truth about what was actually registered before defaults applied. So copy // it first. - nsRaw, err := copystructure.Copy(w.registration.service) + nsRaw, err := copystructure.Copy(service) if err != nil { return nil, err } @@ -504,16 +499,16 @@ func (w *serviceConfigWatch) mergeServiceConfig() (*structs.NodeService, error) // Merge proxy defaults ns := nsRaw.(*structs.NodeService) - if err := mergo.Merge(&ns.Proxy.Config, w.defaults.ProxyConfig); err != nil { + if err := mergo.Merge(&ns.Proxy.Config, defaults.ProxyConfig); err != nil { return nil, err } - if err := mergo.Merge(&ns.Proxy.Expose, w.defaults.Expose); err != nil { + if err := mergo.Merge(&ns.Proxy.Expose, defaults.Expose); err != nil { return nil, err } if ns.Proxy.MeshGateway.Mode == structs.MeshGatewayModeDefault { - ns.Proxy.MeshGateway.Mode = w.defaults.MeshGateway.Mode + ns.Proxy.MeshGateway.Mode = defaults.MeshGateway.Mode } // Merge upstream defaults if there were any returned @@ -529,7 +524,7 @@ func (w *serviceConfigWatch) mergeServiceConfig() (*structs.NodeService, error) us.MeshGateway.Mode = ns.Proxy.MeshGateway.Mode } - usCfg, ok := w.defaults.UpstreamIDConfigs.GetUpstreamConfig(us.DestinationID()) + usCfg, ok := defaults.UpstreamIDConfigs.GetUpstreamConfig(us.DestinationID()) if !ok { // No config defaults to merge continue