diff --git a/agent/service_manager.go b/agent/service_manager.go index 5a163f3596..a20a00d198 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -27,10 +27,11 @@ type ServiceManager struct { // services tracks all active watches for registered services services map[structs.ServiceID]*serviceConfigWatch - // registerCh is a channel for processing service registrations in the - // background when watches are notified of changes. All sends and receives - // must also obey the ctx.Done() channel to avoid a deadlock during - // shutdown. + // registerCh is a channel for receiving service registration requests from + // from serviceConfigWatchers. + // The registrations are handled in the background when watches are notified of + // changes. All sends and receives must also obey the ctx.Done() channel to + // avoid a deadlock during shutdown. registerCh chan *asyncRegisterRequest // ctx is the shared context for all goroutines launched @@ -167,16 +168,15 @@ func (s *ServiceManager) AddService(req *addServiceRequest) error { // merged config. watch := &serviceConfigWatch{ registration: reg, - updateCh: make(chan cache.UpdateEvent, 1), agent: s.agent, registerCh: s.registerCh, } err := watch.RegisterAndStart( + s.ctx, previousDefaults, waitForCentralConfig, persistServiceConfig, - s.ctx, &s.running, ) if err != nil { @@ -220,7 +220,6 @@ type serviceRegistration struct { // service/proxy defaults. type serviceConfigWatch struct { registration *serviceRegistration - defaults *structs.ServiceConfigResponse agent *Agent registerCh chan<- *asyncRegisterRequest @@ -229,39 +228,34 @@ type serviceConfigWatch struct { // we check to see if a new cache watch is needed. cacheKey string - // updateCh receives changes from cache watchers - updateCh chan cache.UpdateEvent - - ctx context.Context cancelFunc func() running sync.WaitGroup } // NOTE: this is called while holding the Agent.stateLock func (w *serviceConfigWatch) RegisterAndStart( - previousDefaults *structs.ServiceConfigResponse, + ctx context.Context, + serviceDefaults *structs.ServiceConfigResponse, waitForCentralConfig bool, persistServiceConfig bool, - ctx context.Context, wg *sync.WaitGroup, ) error { - service := w.registration.service - // Either we explicitly block waiting for defaults before registering, // or we feed it some seed data (or NO data) and bypass the blocking // operation. Either way the watcher will end up with something flagged // as defaults even if they don't actually reflect actual defaults. if waitForCentralConfig { - if err := w.fetchDefaults(ctx); err != nil { - return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", service.ID, err) + var err error + serviceDefaults, err = w.fetchDefaults(ctx) + if err != nil { + return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", + w.registration.service.ID, err) } - } else { - w.defaults = previousDefaults } // Merge the local registration with the central defaults and update this service // in the local state. - merged, err := w.mergeServiceConfig() + merged, err := mergeServiceConfig(serviceDefaults, w.registration.service) if err != nil { return err } @@ -273,7 +267,7 @@ func (w *serviceConfigWatch) RegisterAndStart( service: merged, chkTypes: w.registration.chkTypes, persistService: w.registration.service, - persistDefaults: w.defaults, + persistDefaults: serviceDefaults, persist: w.registration.persist, persistServiceConfig: persistServiceConfig, token: w.registration.token, @@ -290,22 +284,20 @@ func (w *serviceConfigWatch) RegisterAndStart( } // NOTE: this is called while holding the Agent.stateLock -func (w *serviceConfigWatch) fetchDefaults(ctx context.Context) error { +func (w *serviceConfigWatch) fetchDefaults(ctx context.Context) (*structs.ServiceConfigResponse, error) { req := makeConfigRequest(w.agent, w.registration) raw, _, err := w.agent.cache.Get(ctx, cachetype.ResolvedServiceConfigName, req) if err != nil { - return err + return nil, err } - reply, ok := raw.(*structs.ServiceConfigResponse) + serviceConfig, ok := raw.(*structs.ServiceConfigResponse) if !ok { // This should never happen, but we want to protect against panics - return fmt.Errorf("internal error: response type not correct") + return nil, fmt.Errorf("internal error: response type not correct") } - - w.defaults = reply - return nil + return serviceConfig, nil } // Start starts the config watch and a goroutine to handle updates over the @@ -314,13 +306,15 @@ func (w *serviceConfigWatch) fetchDefaults(ctx context.Context) error { // // NOTE: this is called while holding the Agent.stateLock func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) error { - w.ctx, w.cancelFunc = context.WithCancel(ctx) + ctx, w.cancelFunc = context.WithCancel(ctx) // Configure and start a cache.Notify goroutine to run a continuous // blocking query on the resolved service config for this service. req := makeConfigRequest(w.agent, w.registration) w.cacheKey = req.CacheInfo().Key + updateCh := make(chan cache.UpdateEvent, 1) + // We use the cache key as the correlationID here. Notify in general will not // respond on the updateCh after the context is cancelled however there could // possible be a race where it has only just got an update and checked the @@ -328,11 +322,11 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro // the cacheKey allows us to ignore updates from the old cache watch and makes // even this rare edge case safe. err := w.agent.cache.Notify( - w.ctx, + ctx, cachetype.ResolvedServiceConfigName, req, w.cacheKey, - w.updateCh, + updateCh, ) if err != nil { w.cancelFunc() @@ -341,7 +335,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro w.running.Add(1) wg.Add(1) - go w.runWatch(wg) + go w.runWatch(ctx, wg, updateCh) return nil } @@ -355,16 +349,16 @@ func (w *serviceConfigWatch) Stop() { // config watch is shut down. // // NOTE: the caller must NOT hold the Agent.stateLock! -func (w *serviceConfigWatch) runWatch(wg *sync.WaitGroup) { +func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup, updateCh chan cache.UpdateEvent) { defer wg.Done() defer w.running.Done() for { select { - case <-w.ctx.Done(): + case <-ctx.Done(): return - case event := <-w.updateCh: - if err := w.handleUpdate(event); err != nil { + case event := <-updateCh: + if err := w.handleUpdate(ctx, event); err != nil { w.agent.logger.Error("error handling service update", "error", err) } } @@ -375,14 +369,14 @@ func (w *serviceConfigWatch) runWatch(wg *sync.WaitGroup) { // the local state and re-registers the service with the newly merged config. // // NOTE: the caller must NOT hold the Agent.stateLock! -func (w *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) error { +func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.UpdateEvent) error { // If we got an error, log a warning if this is the first update; otherwise return the error. // We want the initial update to cause a service registration no matter what. if event.Err != nil { return fmt.Errorf("error watching service config: %v", event.Err) } - res, ok := event.Result.(*structs.ServiceConfigResponse) + serviceDefaults, ok := event.Result.(*structs.ServiceConfigResponse) if !ok { return fmt.Errorf("unknown update event type: %T", event) } @@ -394,21 +388,18 @@ func (w *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) error { // delivered) the correct config so just ignore this old message. return nil } - w.defaults = res // Merge the local registration with the central defaults and update this service // in the local state. - merged, err := w.mergeServiceConfig() + merged, err := mergeServiceConfig(serviceDefaults, w.registration.service) if err != nil { return err } // While we were waiting on the agent state lock we may have been shutdown. // So avoid doing a registration in that case. - select { - case <-w.ctx.Done(): + if err := ctx.Err(); err != nil { return nil - default: } registerReq := &asyncRegisterRequest{ @@ -416,7 +407,7 @@ func (w *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) error { service: merged, chkTypes: w.registration.chkTypes, persistService: w.registration.service, - persistDefaults: w.defaults, + persistDefaults: serviceDefaults, persist: w.registration.persist, persistServiceConfig: true, token: w.registration.token, @@ -427,13 +418,13 @@ func (w *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) error { } select { - case <-w.ctx.Done(): + case <-ctx.Done(): return nil case w.registerCh <- registerReq: } select { - case <-w.ctx.Done(): + case <-ctx.Done(): return nil case err := <-registerReq.Reply: @@ -485,19 +476,17 @@ func makeConfigRequest(agent *Agent, registration *serviceRegistration) *structs return req } -// mergeServiceConfig returns the final effective config for the watched service, -// including the latest known global defaults from the servers. -// -// NOTE: this is called while holding the Agent.stateLock -func (w *serviceConfigWatch) mergeServiceConfig() (*structs.NodeService, error) { - if w.defaults == nil { - return w.registration.service, nil +// mergeServiceConfig from service into defaults to produce the final effective +// config for the watched service. +func mergeServiceConfig(defaults *structs.ServiceConfigResponse, service *structs.NodeService) (*structs.NodeService, error) { + if defaults == nil { + return service, nil } // We don't want to change s.registration in place since it is our source of // truth about what was actually registered before defaults applied. So copy // it first. - nsRaw, err := copystructure.Copy(w.registration.service) + nsRaw, err := copystructure.Copy(service) if err != nil { return nil, err } @@ -505,16 +494,16 @@ func (w *serviceConfigWatch) mergeServiceConfig() (*structs.NodeService, error) // Merge proxy defaults ns := nsRaw.(*structs.NodeService) - if err := mergo.Merge(&ns.Proxy.Config, w.defaults.ProxyConfig); err != nil { + if err := mergo.Merge(&ns.Proxy.Config, defaults.ProxyConfig); err != nil { return nil, err } - if err := mergo.Merge(&ns.Proxy.Expose, w.defaults.Expose); err != nil { + if err := mergo.Merge(&ns.Proxy.Expose, defaults.Expose); err != nil { return nil, err } if ns.Proxy.MeshGateway.Mode == structs.MeshGatewayModeDefault { - ns.Proxy.MeshGateway.Mode = w.defaults.MeshGateway.Mode + ns.Proxy.MeshGateway.Mode = defaults.MeshGateway.Mode } // Merge upstream defaults if there were any returned @@ -530,7 +519,7 @@ func (w *serviceConfigWatch) mergeServiceConfig() (*structs.NodeService, error) us.MeshGateway.Mode = ns.Proxy.MeshGateway.Mode } - usCfg, ok := w.defaults.UpstreamIDConfigs.GetUpstreamConfig(us.DestinationID()) + usCfg, ok := defaults.UpstreamIDConfigs.GetUpstreamConfig(us.DestinationID()) if !ok { // No config defaults to merge continue